Building DataFlow Applications : Building DataFlow Applications in Java : DataFlow Operator Library : Performing I/O Operations
 
Share this page                  
Performing I/O Operations
The DataFlow operator library contains several pre-built I/O operators. This section covers each of those operators and provides details on how to use them.
Using ByteSource Implementations
Using ByteSink Implementations
Read Operators
Using the ReadAvro Operator to Read Apache Avro Data
Using the ReadORC Operator to Read Apache ORC Files
Using the ReadMDF Operator to Read MDF Files
Using the ReadParquet Operator to Read Apache Parquet Files
Using the ReadFromJDBC Operator to Read Databases
Using the ReadDelimitedText Operator to Read Delimited Text
Using the ReadFixedText Operator to Read Fixed-width Text
Using the ReadSource Operator to Read Generic Sources
Using the ReadLog Operator to Read Log Data
Using the ReadARFF Operator to Read Sparse Data
Using the ReadStagingDataset Operator to Read Staging Datasets
Using the ParseTextFields Operator to Parse Text Records
Using the ReadJSON Operator to Read JSON Text
Write Operators
Using the WriteAvro Operator to Write Apache Avro Data
Using the WriteToJDBC Operator to Write to Databases
Using the DeleteFromJDBC Operator to Write Database Deletes
Using the UpdateInJDBC Operator to Write Database Updates
Using the WriteDelimitedText Operator to Write Delimited Text
Using the WriteFixedText Operator to Write Fixed-width Text
Using the WriteSink Operator to Write Generic Targets
Using the WriteStagingDataset Operator to Write Staging Data Sets
Using the WriteARFF Operator to Write Sparse Data
Using the ForceRecordStaging Operator to Explicitly Stage Data
Using the WriteORC Operator to Write Apache ORC Files
Database Loaders
Using the LoadActianVector Operator to Load Actian Vector
Using the LoadMatrix Operator to Load Actian Matrix
Using the MatrixSink Operator to Load Actian Matrix from SQL
HBase Operators
Using the DeleteHBase Operator to Delete from HBase
Using the ReadHBase Operator to Read from HBase
Using the WriteHBase Operator to Write to HBase
Using ByteSource Implementations
The ByteSource interface defines a way to obtain binary data from data sources for downstream decoding, parsing, and data extraction. Separating this interface from the specific read operators allows readers to access source data in many different ways without needing to write a new operator for each possible access path.
File-based readers expose convenience methods for setting the ByteSource to use as either a Path or a String containing the path to the files to access. Underneath, the Path and String implementations are converted into ByteSource objects for use in the lower level I/O functions.
In general, the Path or String interfaces for the file-based read operators will provide the functionality needed. For example, wildcard characters can be used to specify a pattern of files to read. Likewise, passing in a directory implies all files in the directory should be read as one logical unit. However, sometimes it will be necessary to use a ByteSource directly. The ByteSource implementations are covered in detail with usage examples in this section.
For more information about the ByteSource interface and the classes that implement it, see the JavaDoc reference documentation.
BasicByteSource
A BasicByteSource represents reading data from a single Path that can be provided as either a Path or String parameter. If the Path is a directory, it represents reading from all files in the directory. These interfaces provide no more functionality than using a reader directly with a Path for the path name.
A BasicByteSource will also detect compressed data automatically based on file suffixes, although additional constructors provide the ability to specifically set the compression format.
A constructor is also provided that allows specifying an action to take if an input file cannot be read. This is useful for situations where a directory path name is given and a subset of the files contained within it cannot be accessed for reading.
Following is an example of creating a byte source that supports compression (meaning the data being read is already compressed and should be decompressed during the reading process).
Using the BasicByteSource operator with Compression
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.io.compression.CompressionFormats;
import com.pervasive.datarush.operators.io.BasicByteSource;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

/**
 * Read a file that is compressed.
 * 
 */
public class ReadCompressedRatings {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadRatings");
        
        // List the supported compression formats
        System.out.println("supported formats: " + CompressionFormats.listSupportedFormats());

        // Create the byte source for the compressed data
        BasicByteSource source = 
            new BasicByteSource(
                Paths.asPath("data/ratings.txt.gz"),
                CompressionFormats.lookupFormat("gzip"));
        
        // Create the reader using the byte source
        ReadDelimitedText reader = graph.add(new ReadDelimitedText());
        reader.setSource(source);
        reader.setFieldSeparator("::");
        reader.setHeader(true);
        RecordTokenType ratingsType = record(
                INT("userID"),
                INT("movieID"),
                INT("rating"),
                STRING("timestamp"));
        reader.setSchema(TextRecord.convert(ratingsType));
        
        // Log results
        LogRows logger = graph.add(new LogRows(1));
        graph.connect(reader.getOutput(), logger.getInput());
        
        // Compile and run the graph
        graph.run();
    }
}
Notes about the previous example:
The list of supported compression formats is printed out. Compression formats are extensible, so custom formats can be added as needed.
A BasicByteSource is constructed. In this case we need to provide a Path to the files to read and the compression format to use. The helper class CompressionFormats is used to obtain the CompressionFormat implementation for the "gzip" compression type. Since here the source name ends with ".gz", providing a format is not strictly required; the ByteSource would automatically detect the format in this case.
The created byte source is passed to the reader as the source property. The reader will use the byte source to obtain binary data for decoding, parsing, and data extraction. The data source will decompress the input data before passing it on to the reader. This allows the reader to be agnostic as to what pre-processing must take place on the data before data parsing is applied.
ConcatenatedByteSource
A ConcatenatedByteSource combines several ByteSource objects together into a single ByteSource. When read, the source produces the data from each underlying source in turn, resulting in a concatenation of the contents.
A typical usage is to read a provided list of files. Usually a list of files can be read by either reading all of the files in a directory or using wildcards in file names to read files that match a specific pattern. At times this functionality may not be sufficient. In this case, a list of files can be generated and used to construct a ConcatenatedByteSource.
As with the BasicByteSource, once constructed, the byte source is handed to a reader as the source property. The reader then consumes the bytes of the files from the source and processes them as needed.
Following is an example using a ConcatenatedByteSource. The example is contrived because the files are within the same directory and could have been specified using a wildcard pattern in the file name.
Using the ConcatenatedByteSource operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.LONG;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.TIMESTAMP;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import java.util.Arrays;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Path;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.operators.io.ByteSource;
import com.pervasive.datarush.operators.io.ConcatenatedByteSource;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

/**
 * Read a list of files as one logical unit.
 * 
 */
public class ReadMultipleFiles {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
        
        // Create the byte source for the list of files to read
        Path[] paths = new Path[] {
                Paths.asPath("data/logs1.txt"),
                Paths.asPath("data/logs2.txt"),
                Paths.asPath("data/logs3.txt"),
        };
        ByteSource source = ConcatenatedByteSource.getConcatenatedSource(Arrays.asList(paths));
        
        // Create the reader using the byte source
        ReadDelimitedText reader = graph.add(new ReadDelimitedText());
        reader.setSource(source);
        reader.setFieldSeparator("|");
        RecordTokenType type = record(
            STRING("entityID"),
            TIMESTAMP("timestamp"),
            LONG("siteID"),
            INT("compromised"),
            LONG("eventID"));
        reader.setSchema(TextRecord.convert(type));
        
        // Log results
        LogRows logger = graph.add(new LogRows(1));
        graph.connect(reader.getOutput(), logger.getInput());
        
        // Compile and run the graph
        graph.run();
    }
}
Notes about the ConcatenatedByteSource example:
A list of Path objects is assembled representing the files to read. This list can be built applying whatever means necessary. The example is building a simple list of files that exist in the same directory. However, the list of files could be from different directories with different naming schemes. The data contained in the files must all be of the same expected format; if a header were present, it would need to appear in each file.
The list of Path objects is converted into a ByteSource using the static method getConcatenatedSource(). This method creates the ConcatenatedByteSource from the list of Paths.
The byte source is set as the source property on the reader.
GlobbingByteSource
A GlobbingByteSource applies a file globbing pattern to the file name within a file path. Globbing is the capability of using wildcard characters within a file name to match the pattern to files in an actual file system. After the globbing is accomplished, a GlobbingByteSource acts similarly to a ConcatenatedByteSource, using the list of filtered paths as the files to be read as one logical unit.
The interfaces into the file read operators allow setting the source as a String. When this interface is used, the given String may contain any supported wildcard characters. The readers use a GlobbingByteSource in that case to resolve the actual files to be read.
The only reason then to use a GlobbingByteSource directly is to specify how to handle files that match the globbing pattern but are unreadable.
The followoing code example uses a GlobbingByteSource directly, setting the action to take when a source file is unreadable.
Using the GlobbingByteSource operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.LONG;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.TIMESTAMP;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.ByteSource;
import com.pervasive.datarush.operators.io.GlobbingByteSource;
import com.pervasive.datarush.operators.io.UnreadableSourceAction;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

/**
 * Read a list of files as one logical unit.
 * 
 */
public class ReadGlobbedFiles {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
        
        // Create a globbing byte source using a simple wildcard
        ByteSource source = new GlobbingByteSource("data/logs?.txt", UnreadableSourceAction.FAIL);
        
        // Create the reader using the byte source
        ReadDelimitedText reader = graph.add(new ReadDelimitedText());
        reader.setSource(source);
        reader.setFieldSeparator("|");
        RecordTokenType type = record(
                STRING("entityID"), 
                TIMESTAMP("timestamp"), 
                LONG("siteID"), 
                INT("compromised"), 
                LONG("eventID"));
        reader.setSchema(TextRecord.convert(type));
        
        // Log results
        LogRows logger = graph.add(new LogRows(1));
        graph.connect(reader.getOutput(), logger.getInput());
        
        // Compile and run the graph
        graph.run();
    }
}
Notes about the GlobbingByteSource example:
The GlobbingByteSource is created with a file path containing a wildcard pattern for the file name. The action for unreadable source files is set to FAIL. Any source files that cannot be accessed using the file name pattern will cause an exception.
The byte source is used as the source property for the reader. The reader will access all of the files matching the pattern as one logical unit of data.
Using ByteSink Implementations
The ByteSink interface defines a way to abstractly write byte data from a writer to a target file or set of files. Separating this interface from file-based writer operators allows writer implementations to target data without direct knowledge of where or how the data is being written.
File-based writers expose convenience methods for setting the ByteSink to use either a Path or a String containing the path to the files to write. Underneath, the Path and String implementations are converted into ByteSink objects for use in the lower level I/O functions.
In general, the Path or String interfaces for the file-based write operators provide the functionality needed. For example, passing in a directory implies each active stream of data in the application should write a unique file in the directory. This allows parallel writing to take place.
The only time the need arises to use a ByteSink directly is to compress the output data before writing. The BasicByteSink implementation is covered in detail below with usage examples.
For more information about the ByteSink interface and the classes that implement it, see the JavaDoc reference documentation.
BasicByteSink
The BasicByteSink implementation specifies the use and type of compression when writing the target files. The following code example shows how to create a BasicByteSink specifying the type of compression to use. After the sink is created, it can be passed to a file write operator as the Sink property.
Using the BasicByteSink operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.io.WriteMode;
import com.pervasive.datarush.io.compression.CompressionFormats;
import com.pervasive.datarush.operators.io.BasicByteSink;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

/**
 * Write the ratings file with gzip compression.
 */
public class WriteCompressedRatings {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
        
        // Create the reader using the byte source
        ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
        reader.setFieldSeparator("::");
        reader.setHeader(true);
        RecordTokenType ratingsType = record(
                INT("userID"), 
                INT("movieID"), 
                INT("rating"), 
                STRING("timestamp"));
        reader.setSchema(TextRecord.convert(ratingsType));
        
        // Write the ratings using compression
        BasicByteSink sink = 
            new BasicByteSink(
                    Paths.asPath("results/ratings.txt.gz"), 
                    CompressionFormats.lookupFormat("gzip"));
        
        WriteDelimitedText writer = graph.add(new WriteDelimitedText(sink, WriteMode.OVERWRITE));
        writer.setFieldSeparator("::");
        writer.setHeader(true);
        writer.setFieldDelimiter("");
        writer.setWriteSingleSink(true);    // Write a single output file (loss of parallelism)
        
        // Connect reader and writer
        graph.connect(reader.getOutput(), writer.getInput());
        
        // Compile and run the graph
        graph.run();
    }
}
Notes about the example:
A BasicByteSink is constructed using a Path and the "gzip" compression format.
The sink with compression enabled is passed as the Sink property to the writer’s constructor.
When the graph is executed, the data will be written in delimited text format and compressed using "gzip" compression.
Read Operators
Using the ReadAvro Operator to Read Apache Avro Data
Using the ReadORC Operator to Read Apache ORC Files
Using the ReadParquet Operator to Read Apache Parquet Files
Using the ReadFromJDBC Operator to Read Databases
Using the ReadDelimitedText Operator to Read Delimited Text
Using the ReadFixedText Operator to Read Fixed-width Text
Using the ReadSource Operator to Read Generic Sources
Using the ReadLog Operator to Read Log Data
Using the ReadARFF Operator to Read Sparse Data
Using the ReadStagingDataset Operator to Read Staging Datasets
Using the ParseTextFields Operator to Parse Text Records
Using the ReadJSON Operator to Read JSON Text
Using the ReadAvro Operator to Read Apache Avro Data
The ReadAvro operator reads a data file previously written using the Apache Avro serialization format. The Avro format is a commonly used binary format that offers data compression and the ability to be parsed in parallel. Metadata about the data, such as its schema and compression format, are serialized into the file, making it available to readers.
The operator will translate the Avro schema into appropriate DataFlow types when possible—some schemas are not supported for reading, as described later.
As DataFlow operates on records, it is generally expected that the source data will have a RECORD schema type. If this is not the case, the operator treats the schema as if it were a record with a single field named "value."
The output record type will have fields with the same names and in the same order as the source schema. Output fields are assigned a type based on the schema of the source field with the same name. In general, Avro schema types are assigned DataFlow types according to the following table.
Avro Schema Type
DataFlow Type
BOOLEAN
BOOLEAN
BYTES
BINARY
FIXED
BINARY
DOUBLE
DOUBLE
FLOAT
FLOAT
INT
INT
LONG
LONG
STRING
STRING
ENUM
STRING with domain using the declared set of symbols
For types not listed previously, the schema type may or may not be mapped to a DataFlow type. If attempting to read a source with schema types that cannot be mapped to DataFlow types, the operator will produce an error. The conditions under which other schema types are supported are as follows:
Source fields with ARRAY or MAP schema types are never supported.
Source fields with a RECORD schema type are supported only when reading Avro data written using the WriteAvro operator; fields with DataFlow types that do not have analogues in Avro are written as nested records. Source fields using these same schemas will be mapped back into the original DataFlow type.
Source fields with a UNION schema type is only supported if it is a union of exactly two schema types where one type is NULL. In this case, the type is determined using the non-NULL schema type of the union.
For information about creating files containing data in Avro format using DataFlow, see Using the WriteAvro Operator to Write Apache Avro Data.
When reading Avro files written by DataFlow, there may be additional metadata information about the data embedded within the files. If the reader has been configured to use this metadata, then it can obtain information about the ordering and partitioning of the data when it was written, which can eliminate the need to re-sort or partition the data.
Code Examples
Because Avro files are self-contained with respect to metadata, it is generally not necessary to provide any information other than the location of the data. Following is an example use of the operator in Java.
Using the ReadAvro operator in Java
ReadAvro reader = graph.add(new ReadAvro("data/ratings.avro"));
The following example demonstrates using the delimited text reader within RushScript.
var data = dr.readAvro({source:'data/ratings.avro'});
Properties
The ReadAvro operator supports the following properties:
Name
Type
Description
extraFieldAction
How to handle fields found when parsing the record, but not declared in the schema.
fieldErrorAction
How to handle fields that cannot be parsed.
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – the offset of the first character of the record text from the start of the containing split.
If these names would collide with those defined in the source schema, they will be renamed so as to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
missingFieldAction
How to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output.
parseErrorAction
How to handle all parsing errors.
parseOptions
The parsing options used by the reader.
pessimisticSplitting
boolean
Configures whether pessimistic file-splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file (that is, it assumes the input files are not splittable).
readBuffer
int
The size of the I/O buffer, in bytes, to use for reads. Default: 64K.
readOnClient
boolean
Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context.
recordWarningThreshold
int
The maximum number of records that can have parse warnings before failing.
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
The configuration used in determining how to break the source into splits.
useMetadata
boolean
Whether the reader should use any discovered metadata about the ordering and distribution. Defaults to false.
Ports
The ReadAvro operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data to read and parse from the provided input data files (sources).
Using the ReadORC Operator to Read Apache ORC Files
The ReadORC operator reads a data file that is written earlier using the Apache Optimized Row Columnar (ORC) File format. The ORC format is supported by Apache Hive.
The ORC format is columnar-based but can also be autonomous. The columns in an ORC file separate the stripes or sections of the file. An internal index is used to track a section of the data within each column. This organization allows readers to efficiently omit the columns that are not required. Also, each column can apply a different compression method depending on the data type. Metadata about the ORC data, such as the schema and compression format are serialized into the file and are made available to the readers.
The operator translates the ORC file schema into appropriate DataFlow types when it is possible. A few ORC data types are not supported for reading. The columns with unsupported data types are omitted. The output record type has fields with the same names and in the same order as the source schema. The type in the output fields are assigned based on the schema of the source field with the same name.
In general, DataFlow types are assigned to the ORC schema types as shown in the following table.
DataFlow Type
ORC Schema Type
BOOLEAN
BOOLEAN
LONG
BIGINT
BINARY
BINARY
STRING
CHAR
DATE
DATE
NUMERIC
DECIMAL
DOUBLE
DOUBLE
FLOAT
FLOAT
INT
INT
INT
SMALLINT
STRING
STRING
INT
TINYINT
TIMESTAMP
TIMESTAMP
STRING
VARCHAR
Several ORC types are not supported by DataFlow. If these types are found in an ORC file, they will be ignored. The reader logs a message for all columns that are omitted having the unsupported data types. The following ORC data types are not supported:
LIST
MAP
STRUCT
UNION
Column Pruning
We recommend you limit the columns read to only the ones required for downstream processing, since ORC format is columnar. Use the selectedFields property to specify the fields to read. For more information, see Properties. The ORC columns not included in the list are not omitted. This optimization provides a performance boost especially for files containing a large number of columns.
Note:  Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see Integrating DataFlow with Hadoop.
Code Examples
It is required to provide the location of the data to the operator because the ORC files are self-contained about the metadata. We recommend you limit the columns read to only the ones required for downstream processing. Because ORC format is columnar, reducing the columns read might enhance performance. Use the selectFields property to specify the columns to read from a given ORC data set.
The following example provides reading the ORC file data using Java.
Using ReadORC in Java
ReadORC reader = graph.add(new ReadORC("data/ratings.orc"));
Using ReadORC in RushScript
var data = dr.readORC({source:'data/ratings.orc'});
Properties
The ReadORC operator supports the following properties.
Name
Type
Description
includeSourceInfo
boolean
Determines if the output records will include the additional fields having the origin information for the record. If true, records will have three additional fields:
sourcePath – path of the file that originates the record. If the path is unknown, then it is NULL.
splitOffset – offset of the starting byte in the source data containing the split.
recordOffset – offset of the first character in the record text from the start of the containing split.
If these names are the same as those defined in the source schema, they are renamed. These fields are added as the first three of the output and not impacted by the selectedFields property.
pessimisticSplitting
boolean
Defines whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file. This means that the input files cannot be split.
readBuffer
int
Defines the size of the I/O buffer (in bytes) that is used for reads. Default: 64K.
readOnClient
boolean
Determines whether reads are performed by the client or within the cluster. If executed in a distributed context, the reads are performed in the cluster by default.
selectedFields
List<String>
Defines the list of input fields to be included in the output. This is used to limit the fields available on the output port of the reader. We recommend you use this property to limit the columns read to only those required for downstream processing.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
Defines the configuration used to determine breaking the source into splits.
useMetadata
boolean
Determines whether the reader should use any discovered metadata on ordering and distribution. Default: false.
Ports
The ReadORC operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data read from the provided input data files (sources).
Using the ReadMDF Operator to Read MDF Files
This topic describes the DataFlow MDF read operator. For information about its KNIME node, see MDF Reader.
The ReadMDF operator reads a data file previously written using the ASAM MDF format. The MDF format is supported and maintained by ASAM.
MDF or Measurment Data Format is a binary file format used to store recorded and calculated data that is frequently used in post-measurement processing, off-line evaluation, or long-term storage.
It offers efficient and high performance storage of large amounts of measurement data. The file format allows the storage of the raw measurement data along with associated metadata and corresponding conversion formulas so that the raw data can still be interpreted correctly and utilized through post-processing.
The operator will translate the MDF schema into appropriate DataFlow types whenever possible, although because of the frequent usage of unsigned types in MDF data, sometimes the type used by DataFlow must be wider than the original type specified in the metadata to prevent loss of scale or precision.
Since DataFlow operates on the concept of homogenous data records within a given flow, a ReadMDF operator is only able to extract one record type from the file at a time, although multiple ReadMDF operators can read multiple records types concurrently from the same file.
The output record type will have fields with the names and types determined by the metadata provided in the file. The ordering of the fields will also correspond to the declaration order within the metadata, with the exception that the master channel will always be the first field even if it is not defined first.
Currently the operator only supports primitive types, which have an analog within DataFlow; therefore the extraction of MIME types in various media formats is not currently supported.
Code Examples
Since MDF files are self-contained with respect to metadata, it is generally not necessary to provide any information other than the location of the data and the data group containing the specified record that should be extracted. Following is an example use of the operator in Java.
Using the ReadMDF operator in Java
ReadMDF reader = graph.add(new ReadMDF("data/output.mf4"));
reader.setDataChannel(1);
reader.setRecordId(1);
The following example demonstrates using the MDF reader within RushScript:
var data = dr.readMDF({source:'data/output.mf4', dataChannel:1, recordId:1});
Properties
The ReadMDF operator supports the following properties:
Name
Type
Description
source
ByteSource, Path, or String
Source of the input data to parse as delimited text
dataChannel
int
The data group containing the record to read
recordId
long
The record ID to read within the specified data group. If the record ID is unspecified, it will attempt to read the first channel group.
convertRaw
boolean
Specifies whether the raw values should have any included conversion rules applied
version
MDFVersion
Sets the version of the MDF file. Currently, MDF version 4 is supported.
runMode
ExtractionMode
Sets the mode that will decide what will be extracted from the MDF file:
DATA - Extracts the data from the specified record.
ATTACHMENT - Extracts the raw binary data of any attachments.
METADATA - Extracts the metadata from the file.
Addtional properties are shared with delimited text.
Ports
The ReadMDF operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data read from the provided input data files (sources).
Using the ReadParquet Operator to Read Apache Parquet Files
The ReadParquet operator reads data that is written earlier using Apache Parquet format. The Parquet format is supported by Apache Hive.
Parquet is a columnar file format used to store the tabular form of data. Parquet supports compression and encoding the schemes effectively and allows specifying the compression schemes at each column level. It supports:
Source projects such as Apache Hadoop (MapReduce), Apache Hive, Impala, and so on, as it presents the data in columnar format
Compression codecs such as SNIPPY, GZIP, and LZO. The design allows integration with future types.
The ReadParquet <include link> operator uses Hive libraries through the shim layer and requires a Hadoop module configuration to be enabled, even if the workflow does not run on the cluster or access HDFS.
DataFlow automatically determines the equivalent data types from Parquet. The result is the output type of the reader. However, Parquet and DataFlow support different data types and not all data in Parquet format can be read. If it attempts to read data that cannot be represented in DataFlow, an error is returned.
The primitive Parquet types are mapped to DataFlow as shown in the following table.
DataFlow Type
Parquet Type
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
FLOAT
FLOAT
INT32
INT
INT64
LONG
BINARY
STRING
Note:  Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see Integrating DataFlow with Hadoop.
Code Examples
The following example provides reading the Parquet file data using Java.
Using ReadParquet in Java
// Path to read entire Hive table
ReadParquet reader = new ReadParquet("hdfs://10.100.10.41:8020//apps/hive/warehouse/CityParquet");

// Path to read specific partition of Hive table from HDFS
// ReadParquet reader = new ReadParquet("hdfs://10.100.10.41:8020//apps/hive/warehouse/CityParquet/000000_0");

// Path to read parquet file from Local file system
// ReadParquet reader = new ReadParquet("C:/Parquet/Cities.parquet");

graph.add(reader);
Using ReadParquet in RushScript
var data = dr.readParquet({source:"hdfs://10.100.10.41:8020//apps/hive/warehouse/CityParquet/000000_0"});
Properties
The ReadParquet operator supports the following properties.
Name
Type
Description
includeSourceInfo
boolean
Determines if the output records will include the additional fields having the origin information for the record. If true, records will have three additional fields:
sourcePath – path of the file that originates the record. If the path is unknown, then it is NULL.
splitOffset – offset of the starting byte in the source data containing the split.
recordOffset – offset of the first character in the record text from the start of the containing split.
If these names are the same as those defined in the source schema, then they are renamed. These fields are added as the first three of the output and not impacted by the selectedFields property.
pessimisticSplitting
boolean
Defines whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file. This means that the input files cannot be split.
readBuffer
int
Defines the size of the I/O buffer (in bytes) that is used for reads. Default: 64K.
readOnClient
boolean
Determines whether reads are performed by the client or within the cluster. If executed in a distributed context, the reads are performed in the cluster by default.
selectedFields
List<String>
Defines the list of input fields to be included in the output. This is used to limit the fields available on the output port of the reader. We recommend you use this property to limit the columns read to only those required for downstream processing.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
Defines the configuration used to determine breaking the source into splits.
useMetadata
boolean
Determines whether the reader should use any discovered metadata on ordering and distribution. Default: false.
Ports
The ReadParquet operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data read from the provided input data files (sources).
Using the ReadFromJDBC Operator to Read Databases
The ReadFromJDBC operator accesses relational database systems using a supplied JDBC driver. The JDBC driver must be in the class path of the DataFlow application. Each database provides a JDBC driver implementation that DataFlow can use to access data in the database. Reference the specific database to be accessed for driver-specific information.
The ReadFromJDBC operator can be used to read all of the columns from a specific table or to execute a provided query. The query provided can be a complex, multitable query. Follow the syntax guidelines of the database being queried.
The results of the query will be made available to the output port of the operator. The operator transforms the database column types to supported DataFlow scalar types. Some database-specific data types may not map well and will either be ignored or mapped to Java Object types.
The results of a database query executed through JDBC are returned through a ResultSet object. The ResultSet is used to iterate through the resultant rows and to access column data. The JDBC ResultSet class does not support multithreaded access. Given that, the default behavior of the ReadFromJDBC operator is to execute in nonparallel mode when provided a nonparameterized query.
To execute queries in parallel (and distributed), the ReadFromJDBC operator supports the use of parameterized queries. JDBC supports adding parameters to a query using the "?" character. Following is an example of a parameterized query. Note the use of the "?" character in the "where" clause.
Example query with parameters
select * from lineitem where l_shipmode = ?
When used as the data query for the ReadFromJDBC operator, a parameterized query can be executed in parallel. A set of parameters must be supplied to the parallel workers executing the parameterized queries. The parameters can be supplied in one of the following ways:
Through the optional input port the ReadFromJDBC operator.
Obtained by a parameter query supplied as a property to the operator ("parameterQuery"). The query is executed and the results are used as parameters to the parameterized query.
An array of values is passed as a property to the ReadFromJDBC operator ("parameters").
Here is an example of a parameter query:
Query to gather parameters
select distinct l_shipmmode from lineitem
Note that the parameter query is selecting a distinct set of values from the lineitem table. The values will be substituted for the "?" in the parameterized query.
The parameters are handled the same whether they are provided directly as objects, read from the input port, or queried through the parameter query. For each set (row) of parameters, the following occurs:
The parameters are substituted within the parameterized data query. From our example, one of the parameter values is "RAIL". When substituted within the example data query, the resultant query becomes select * from lineitem where l_shipmode = RAIL.
The query with the substituted parameters is executed against the database.
The results of the query are streamed to the output of the operator.
When used with a parameterized query and provided query parameters, the ReadFromJDBC operator operates in parallel by creating multiple workers. The query parameters are distributed to the workers in round-robin fashion. The workers execute the data query after applying parameter substitution as described above.
The order of the parameter value is important. The order must match the order of the parameters (the "?") in the data query. This is true for parameter values from the optional input port, provided as objects or from the parameter query. The ReadFromJDBC operator does not have the context to determine which parameter values match which parameters. The ordering of the parameter values is left to the user.
When using a parameterized query, the number of parameter values provided must match the number of parameters in the query. If there is a mismatch in sizes, an exception will be raised and the operator will fail.
To obtain the best performance, the number of sets of query parameters should be greater than the configured parallelism. In our example parameter query, only 7 values are returned. In this case, having parallelism set to anything greater than 7 will be wasteful. Additional streams of execution will have no data to process.
Writing to database tables can be accomplished with the Using the WriteToJDBC Operator to Write to Databases operator.
Code Examples
The following example demonstrates using the ReadFromJDBC operator to access data using the provided SQL statement. Setting the fetch size and the SQL warning limit properties is optional. Default settings will be used if they are not set explicitly.
Either a table name or a SQL statement to execute can be specified. Using a table name is equivalent to using the statement select * from tableName. In this example, a table name is specified.
Using the ReadFromJDBC Operator in Java
ReadFromJDBC reader = graph.add(new ReadFromJDBC());
reader.setDriverName("com.mysql.jdbc.Driver");
reader.setUrl("jdbc:mysql://dbserver:3306/test");
reader.setUser("test");
reader.setPassword("test");
reader.setSqlWarningLimit(20);
reader.setTableName("tpchorders");
Using the ReadFromJDBC Operator in RushScript
var data = dr.readFromJDBC({driverName:'com.mysql.jdbc.Driver', url:'jdbc:mysql://dbserver:3306/test', user:'test', password:'test', sqlWarningLimit:20, tableName:'tpchorders'});
The following example uses a SQL statement directly. Using the SQL statement allows selection of only the desired fields. A complex statement can also be used to join tables together and have the results presented as a single output data set.
Specifying a SQL statement
ReadFromJDBC reader = graph.add(new ReadFromJDBC());
reader.setDriverName("com.mysql.jdbc.Driver");
reader.setUrl("jdbc:mysql://dbserver:3306/test");
reader.setUser("test");
reader.setPassword("test");
reader.setDataQuery("select o_orderkey, o_orderdate, o_totalprice from totalorders");
This example demonstrates using a parameterized query in Java:
Using a parameterized query in Java
ReadFromJDBC reader = graph.add(new ReadFromJDBC());
reader.setDriverName("com.mysql.jdbc.Driver");
reader.setUrl("jdbc:mysql://dbserver:3306/test");
reader.setUser("test");
reader.setPassword("test");
reader.setSqlWarningLimit(20);
reader.setDataQuery("select * from lineitem where l_shipmode = ?");
reader.setParameterQuery("select distinct l_shipmode from lineitem");
This example demonstrates using a parameterized query in RushScript:
Using a parameterized query in RushScript
var data = dr.readFromJDBC({
 driverName:'com.mysql.jdbc.Driver',
 url:'jdbc:mysql://dbserver:3306/test',
 user:'test',
 password:'test',
 sqlWarningLimit:20,
 dataQuery:'select * from lineitem where l_shipmode = ?',
 parameterQuery:'select distinct l_shipmode from lineitem'});
The driver name and URL format are specific to each JDBC driver. See the documentation of the specific database being used for more information on these values.
Properties
The ReadFromJDBC operator supports the following properties:
Name
Type
Description
connectionFactory
The connection factory to use for acquiring connections. A default implementation is provided. A specialized factory can be provided as needed.
dataQuery
String
The SQL select statement to execute to gather data from the source database. The query must be a select statement. The query can be parameterized. If the query is parameterized, the parameters can be provided through the optional input port of the operator or through the parameters or parameterQuery properties. To read all data from a single table, the tableName property can be provided.
discoverOutputAtRuntime
boolean
Specifies whether the output type should be automatically configured at run time. Generally, this setting should not be used since it can potentially decrease overall performance. However, in some situations it may be necessary to use this option instead of configureOutputType() when the metadata for the table is not correctly being discovered. Generally, this should only be used when calling stored procedures that dynamically generate the output type during execution. In this case the correct output type cannot be discovered without executing the full query during composition.
Note:  Only use this option if absolutely necessary. It may negatively affect performance.
driverName
String
The class name of the JDBC driver to use, for example:
"sun.jdbc.odbc.JdbcOdbcDriver".
errorAction
The action to take if a SQL error occurs while reading data from the database.
fetchSize
int
The fetch size to set on the JDBC driver. The fetch size specifies the number of rows to fetch from the remote database at a time. The database response to a query may include many rows of data. Responses are usually buffered from the server to the client. The fetch size specifies how many rows should be buffered at a time. A larger size will consume more client memory but reduce the overall communication latency with the server.
hostNames
List<String> or String[]
(Optional) A list of database server host names. Some database vendors support multiple, distributed database servers. Providing a list of database server hosts allows the operator to distribute the workload across multiple database servers. If the database being accessed does not support this feature, then this property should not be used.
outputType
Sets the expected output type. The types of the output fields must be compatible with those of the underlying JDBC ResultSet. Consider using configureOutputType() to auto-discover the output type.
parameterQuery
String
The query used to gather parameters applied to a parameterized data query.
parameters
Object[][]
An array of arrays of Objects. Each row represents one set of query parameters.
password
String
The password for connecting to the database.
selectStatement
String
(Deprecated) Use the dataQuery property instead.
sqlWarningLimit
int
The maximum number of warnings regarding SQL errors that an operator can emit. This property is only used if the errorAction property is set to ErrorAction.WARN.
tableName
String
The name of the database table to access. Mutually exclusive with the dataQuery property. A simple query is generated to access data from the table. The query will be executed in nonparallel mode.
url
String
The URL specifying the instance of the database to connect with. Reference the documentation for the database to connect with for formatting information.
user
String
The user name for connecting to the database.
Ports
The ReadFromJDBC operator supports one optional input port. This port is used to provide parameters to a parameterized data query.
Name
Type
Get method
Description
input
getInput()
Optionally provides the set of parameters to apply to the parameterized data query.
The ReadFromJDBC operator provides one output port:
Name
Type
Get method
Description
output
getOutput()
Provides the record data resulting from executing the provided SQL query or selecting all data from the given table.
Using the ReadDelimitedText Operator to Read Delimited Text
The ReadDelimitedText operator reads a text file of delimited records as record tokens. Records are identified by the presence of a non-empty, user-defined record separator sequence between each individual record. Output records contain the same fields as the input text. The reader can also filter and/or reorder the fields of the output as necessary.
Delimited text supports up to three distinct user-defined sequences within a record, used to identify field boundaries:
a field separator: found between individual fields. By default, this is the comma character (,).
a field start delimiter: marking the beginning of a field value. By default, this is the double quote character (").
a field end delimiter: marking the end of a field value. By default, this is the double quote character (").
The field separator cannot be empty. The start and end delimiters can be the same value. They can also both (but not individually) be empty, signifying the absence of field delimiters. It is not expected that all fields start and end with a delimiter, though if one starts with a delimiter it must end with one. Fields containing significant characters, such as whitespace and the record and field separators, must be delimited to avoid parsing errors. Should a delimited field need to contain the end delimiter, it is escaped from its normal interpretation by duplicating it. For instance, the value "ab""c" represents a delimited field value of ab"c.
The reader supports incomplete specification of the separators and delimiters. By default, it will attempt to automatically discover these values based on analysis of a sample of the file. We strongly suggest that this discovery ability not be relied upon if these values are already known, as it cannot be guaranteed to produce desirable results in all cases.
The reader requires a schema to provide parsing and type information for the fields. The schema, in conjunction with any specified field filter, defines the output type of the reader. This can be manually constructed through the API provided, although this metadata is often persisted externally. The StructuredSchemaReader class provides support for reading in Pervasive Data Integrator structured schema descriptors (.schema files) for use with readers. Schemas can also be generated from Record Token Types by using the TextRecord.convert methods.
Because delimited text has explicit field markers, it is also possible to perform automated discovery of the schema based on the contents of the file; the reader provides a pluggable discovery mechanism to support this functionality. Custom mechanisms must implement the TextRecordDiscoverer interface. Two implementations are provided in the operator library:
A mechanism using pattern matching for determining field type. Values for a field are compared to the patterns; any patterns which do not match a field are discarded as possibilities. If multiple possibilities exist and the conflict is between numeric types (for example, integers and doubles), the wider of the two is chosen. Otherwise, conflicts are resolved by treating the field as a string. This is the default mechanism used by the operator.
The set of patterns used can be extended by providing additional patterns when setting the schemaDiscovery property. Alternatively, the TextRecord.extendDefault method can be used to to create a new discoverer using the supplied patterns in addition to the defaults. If the default patterns should not be included, create a PatternBasedDiscovery object directly, specifying only the desired patterns.
A mechanism that treats all fields as “raw” strings—that is, without white space trimming and not treating the empty string as NULL. Use the TextRecord.TEXT_FIELD_DISCOVER constant to reference this mechanism.
Both built-in schema discoverers will generate a schema having as many fields as the longest analyzed row. Both use the header row, if present, to name the schema’s fields. Repetitions of the same name will be resolved by adding a suffix to avoid collision; any missing names will be generated as field<n>, where <n> is the field’s index in the schema.
Typically, the output of the reader includes all records in the file, both those with and without parsing errors. Fields that cannot be parsed are null-valued in the resulting record. If desired, the reader can be configured to filter failed records from the output.
Delimited text data may or may not have a header row. The header row is delimited as usual but contains the names of the fields in the data portion of the record. The reader must be told whether a header row exists. If it does, the parser will skip the header row; otherwise the first row is treated as a record and will appear in the output. If a header row does exist and any of the field names are blank, a field name will be generated. Generated field names take the form “fieldN” where N is the zero-based position of the field.
Delimited text files can be parsed in parallel under “optimistic” assumptions: namely, that parse splits do not occur in the middle of a delimited field value and somewhere before an escaped record separator. This is assumed by default but can be disabled, with an accompanying reduction of scalability and performance.
When reading delimited text files there may be metadata information about the data embedded within the files. If the reader has been configured to use this metadata, it can obtain information about the ordering and partitioning of the data when it was written, which can eliminate the need to re-sort or partition the data.
Delimited text can be written Using the WriteDelimitedText Operator to Write Delimited Text.
Code Examples
The first code example shows a simple usage of the reader. The path to the local file name is given as a parameter to the constructor. This could have also been set using the setSource() method. The field separator and header properties are set. Then a record type is built and used as the input schema. Note that the record type must be converted to an acceptable schema before being used by the reader. Also note that the record separator is not specified. It will be determined by the auto discovery mechanism of the reader.
Using the ReadDelimitedText operator example in Java
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(INT("userID"), INT("movieID"), INT("rating"), STRING("timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
Using the ReadDelimitedText operator usage in RushScript
var ratingsSchema = dr.schema().INT('userID').INT('movieID').INT('rating').STRING('timestamp');

var data = dr.readDelimitedText({source:'data/ratings.txt', fieldSeparator:'::', header:true, schema:ratingsSchema});
The snippet of data below is from the ratings.txt file and can be read using the code example above.
userID::movieID::rating::timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
This next example reads from a file in a Hadoop Distributed File System (HDFS). The hdfs URL scheme identifies the file as being contained within an HDFS file system. The authority section of the URL specifies the specific HDFS file system. The rest of the path indicates the file path within the HDFS instance. A schema is built for this data since it contains a date field. The format or pattern of the date field must be specified since it is non-standard.
Reading from HDFS with a date type
TextRecord schema = 
    SchemaBuilder.define(
            SchemaBuilder.STRING("accountNumber"),
            SchemaBuilder.STRING("clientName"),
            SchemaBuilder.STRING("companyName"),
            SchemaBuilder.STRING("streetAddress"),
            SchemaBuilder.STRING("city"),
            SchemaBuilder.STRING("state"),
            SchemaBuilder.STRING("zip"),
            SchemaBuilder.STRING("emailAddress"),
            SchemaBuilder.DATE("birthDate", "MM/dd/yyyy"),  // specify pattern for parsing the date
            SchemaBuilder.STRING("accountCodes"),
            SchemaBuilder.DOUBLE("standardPayment"),
            SchemaBuilder.DOUBLE("payment"),
            SchemaBuilder.DOUBLE("balance")
            );
        
// Create a delimited text reader for the accounts data
ReadDelimitedText reader = graph.add(new ReadDelimitedText("hdfs://saturn.englab.local:9000/user/jfalgout/data/Accounts.txt"));
reader.setFieldSeparator(",");
reader.setHeader(true);
reader.setSchema(schema);
Following is a snippet of the data that can be read and parsed with the previous code example. Note that each field is surrounded with a double quote as the field delimiter. Also note the format of the "birthDate" field. It is a non-standard (not ISO) format. The schema used to parse the data specifies the pattern used to parse the date field.
"accountNumber","clientName","companyName","streetAddress","city","state","zip","emailAddress","birthDate","accountCodes","standardPayment","payment","balance"
"01-000667","George P Schell","Market Place Products","334 Hilltop Dr","Mentor","OH","44060-1930","warmst864@aol.com","02/28/1971","XA","101.00","100.00","15.89"
"01-002423","Marc S Brittan","Madson & Huth Communication Co","5653 S Blackstone Avenue, #3E","Chicago","IL","60637-4596","mapper@tcent.net","06/30/1975","BA","144.00","144.00","449.92"
"01-006063","Stephanie A Jernigan","La Salle Clinic","77565 Lorain","Akron","OH","44325-4002","dram@akron.net","11/02/1941","EB|CB","126.00","126.00","262.98"
"01-010474","Ernie Esser","Town & Country Electric Inc.","56 Pricewater","Waltham","MA","2453","hazel@bentley.net","12/15/1962","JA|RB","127.00","127.00","271.75"
"01-010852","Robert A Jacoby","Saturn of Baton Rouge","4001 Lafayette","Baton Rouge","LA","70803-4918","din33@norl.com","12/22/1985","ED|EA|RB|KA","142.00","150.00","423.01"
"01-011625","James C Felli","Bemiss Corp.","23A Carolina Park Circle","Spartanburg","SC","29303-9398","cadair@gw.com","02/21/1940","SB","151.00","155.00","515.41"
In the previous example, the schema could also be discovered, extending the default type patterns to recognize the date formats. This can be done in a fairly straightforward fashion:
Custom schema discovery
// Instead of constructing schema and calling reader.setSchema(schema)

TextDataType usDate= TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy"));
List<TypePattern> patterns= Arrays.asList(new TypePattern("\\d{1,2}/\\d{1,2}/\\d+", usDate));

// Simple extension of default pattern-based discovery
reader.setSchemaDiscovery(patterns);

// Complete replacement of schema discoverer
// More interesting when using custom discovery implementation
TextRecordDiscoverer discoverer= TextRecord.extendDefault(patterns);
reader.setSchemaDiscovery(discoverer);
Properties
The ReadDelimitedText operator supports the following properties:
Name
Type
Description
analysisDepth
int
The number of characters to read for performing schema discovery and structural analysis.
autoDiscoverNewline
String
Determines if the record separator should be auto-discovered. Defaul: enabled.
charset
Charset
The character set used by the data source. Default: ISO-8859-1.
charsetName
String
The character set used by the data source by name.
decodeBuffer
int
The size of the buffer, in bytes, used to decode character data. By default, this will be automatically derived using the character set and read buffer size.
discoveryNullIndicator
String
The text value used to represent null values by default in discovered schemas. By default, this is the empty string.
discoveryStringHandling
StringConversion
The default behavior for processing string-valued types in discovered schemas.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the input data into the configured character set. The default action is to replace the faulty data with a replacement character.
extraFieldAction
How to handle fields found when parsing the record, but not declared in the schema.
fieldDelimiter
String
Delimiter used to denote the boundaries of a data field.
fieldEndDelimiter
String
Ending delimiter used to denote the boundaries of a data field.
fieldErrorAction
How to handle fields that cannot be parsed.
fieldLengthThreshold
int
The maximum length allowed for a field value before it is considered an error.
fieldSeparator
String
Delimiter used to define the boundary between data fields.
fieldStartDelimiter
String
Starting delimiter used to denote the boundaries of a data field.
header
String
Whether to expect a header row in the source. The header row contains field names.
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – the offset of the first character of the record text from the start of the containing split.
If these names would collide with those defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
lineComment
String
The character sequence indicating a line comment. Lines beginning with this sequence are ignored.
maxRowLength
int
The limit, in characters, for the first row. Zero indicates no maximum.
missingFieldAction
How to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output.
parseErrorAction
How to handle all parsing errors.
parseOptions
The parsing options used by the reader.
pessimisticSplitting
boolean
Configures whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file (assumes the input files are not splittable).
readBuffer
int
The size of the I/O buffer, in bytes, to use for reads. Default: 64K.
readOnClient
boolean
Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context.
recordSeparator
String
Value to use as a record separator.
recordWarningThreshold
int
The maximum number of records which can have parse warnings before failing.
replacement
String
Replacement string to use when encoding error policy is replacement. Default: '?'
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
schema
The record schema expected in the delimited text source. This property is mutually exclusive with schemaDiscovery; setting one causes the other to be ignored. By default, this property is unset.
schemaDiscovery
The schema discovery mechanism to use. This property is mutually exclusive with schema; setting one causes the other to be ignored. By default, a pattern-based mechanism is used. Supplying a list of pattern/type pairs uses the default discoverer extended with the supplied patterns.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
The configuration used in determining how to break the source into splits.
useMetadata
boolean
Whether the reader should use any discovered metadata about the ordering and distribution. Default: false.
Ports
The ReadDelimitedText operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data to read and parse from the provided input data files (sources).
Using the ReadFixedText Operator to Read Fixed-width Text
Fixed text data contains fields that are not delimited as CSV files are. A schema defines each field and its type, offset, and length within a row of data. Data is parsed from each input row according to the defined position of each field. Field types can be specified along with patterns for parsing the data. Patterns are especially useful for date and timestamp field types.
The ReadFixedText operator reads a text file of fixed-width records as record tokens. Records are identified by the presence of a non-empty, user-defined record separator sequence between each individual record or by the total length of the record if an empty or zero-length record separator is provided. Output records contain the same fields as the input file. The parser can also filter or reorder the fields of the output, as requested.
The reader requires a FixedWidthTextRecord object to provide field position as well as parsing and type information for fields. The schema, in conjunction with any specified field filter, defines the output type of the parser. These can be manually constructed through the API provided, although this metadata is often persisted externally. StructuredSchemaReader provides support for reading in Pervasive Data Integrator structured schema descriptors (.schema files) for use with readers.
Typically, the output of the parsing includes all records in the file, both those with and without parsing errors. Fields that cannot be parsed are null-valued in the resulting record. If desired, the reader can be configured to filter failed records from the output.
Since record boundaries occur at known positions, fixed text files can be parsed in parallel.
Fixed-width text data can be written Using the WriteFixedText Operator to Write Fixed-width Text.
Code Examples
The following example builds a schema that is used by the ReadFixedText operator to read a fixed text format file.
Using the ReadFixedText operator in Java
// Create fixed text reader
ReadFixedText reader = graph.add(new ReadFixedText("data/AccountsFixed.txt"));

// Build the schema. Fields must be added in order of appearance in records.
// The field size must be exact as it determines the position of the field for parsing.
FixedWidthTextRecord schema = new FixedWidthTextRecord(new TextConversionDefaults(StringConversion.NULLABLE_TRIMMED));
schema.defineField("accountNumber", new PaddedTextType(TextTypes.STRING, 9, ' ', Alignment.LEFT));
schema.defineField("name", new PaddedTextType(TextTypes.STRING, 21, ' ', Alignment.LEFT));
schema.defineField("companyName", new PaddedTextType(TextTypes.STRING, 31, ' ', Alignment.LEFT));
schema.defineField("address", new PaddedTextType(TextTypes.STRING, 35, ' ', Alignment.LEFT));
schema.defineField("city", new PaddedTextType(TextTypes.STRING, 16, ' ', Alignment.LEFT));
schema.defineField("state", new PaddedTextType(TextTypes.STRING, 2, ' ', Alignment.LEFT));
schema.defineField("zip", new PaddedTextType(TextTypes.STRING, 10, ' ', Alignment.LEFT));
schema.defineField("emailAddress", new PaddedTextType(TextTypes.STRING, 25, ' ', Alignment.LEFT));
schema.defineField("birthDate", new PaddedTextType(TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy")), 10, ' ', Alignment.LEFT));
schema.defineField("accountCodes", new PaddedTextType(TextTypes.STRING, 11, ' ', Alignment.LEFT));
schema.defineField("standardPayment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
schema.defineField("payment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 7, ' ', Alignment.LEFT));
schema.defineField("balance", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));

// Set the schema of the reader.
reader.setSchema(schema);
An example of data that can be read with the above code fragment follows. Because of the wide nature of the data, the records will most likely appear across multiple lines of the display.
01-000667George P Schell      Market Place Products          334 Hilltop Dr                     Mentor          OH44060-1930warmst864@aol.com        02/28/1971XA         101.00100.00 15.89 
01-002423Marc S Brittan       Madson & Huth Communication Co 5653 S Blackstone Avenue, #3E      Chicago         IL60637-4596mapper@tcent.net         06/30/1975BA         144.00144.00 449.92
01-006063Stephanie A Jernigan La Salle Clinic                77565 Lorain                       Akron           OH44325-4002dram@akron.net           11/02/1941EB|CB      126.00126.00 262.98
01-010474Ernie Esser          Town & Country Electric Inc.   56 Pricewater                      Waltham         MA2453      hazel@bentley.net        12/15/1962JA|RB      127.00127.00 271.75
01-010852Robert A Jacoby      Saturn of Baton Rouge          4001 Lafayette                     Baton Rouge     LA70803-4918din33@norl.com           12/22/1985ED|EA|RB|KA142.00150.00 423.01
01-011625James C Felli        Bemiss Corp.                   23A Carolina Park Circle           Spartanburg     SC29303-9398cadair@gw.com            02/21/1940SB         151.00155.00 515.41
01-018448Alan W Neebe         Georgia State Credit Union     PO Box 159                         Demorest        GA30535-1177delores@truett.com       01/31/1960MA|ED|SB   113.00120.00 131.89
01-018595Alexander Gose       Office Support Services        436 Green Mountain Circle          New Paltz       NY12561-0023dams@matrix.net          06/19/1940EC         147.00147.00 477.09
The following example demonstrates using the ReadFixedText operator in RushScript. The schema is created in RushScript and passed to the operator.
Using the ReadFixedText operator in RushScript
// Build the schema. Fields must be added in order of appearance in records.
// The field size must be exact as it determines the position of the field for parsing.

var accountsFixedSchema = dr.schema({type:'FIXED'})
    .nullable(true)
   .trimmed(true)
    .padChar(' ')
    .alignment('LEFT')
    .STRING("accountNumber", {size:9})
   .STRING("clientName", {size:21})
   .STRING("companyName", {size:31})
   .STRING("streetAddress", {size:35})
   .STRING("city", {size:16})
   .STRING("state", {size:2})
   .STRING("zip", {size:10})
   .STRING("emailAddress", {size:25})
   .DATE("birthDate", {pattern:'MM/dd/yyyy', size:10})
   .STRING("accountCodes", {size:11})
   .DOUBLE("standardPayment", {pattern:'0.00', size:6})
   .DOUBLE("payment", {pattern:'0.00', size:7})
   .DOUBLE("balance", {pattern:'0.00', size:6});

// Read the data
var data = dr.readFixedText({source:'/path/to/file.txt', schema:accountsFixedSchema});
Properties
The ReadFixedText operator supports the following properties:
Name
Type
Description
charset
Charset
The character set used by the data source. Default: ISO-8859-1.
charsetName
String
The character set used by the data source by name.
decodeBuffer
int
The size of the buffer, in bytes, used to decode character data. By default, this will be automatically derived using the character set and read buffer size.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the input data into the configured character set. The default action is to replace the faulty data with a replacement character.
extraFieldAction
How to handle fields found when parsing the record but not declared in the schema.
fieldErrorAction
How to handle fields that cannot be parsed.
fieldLengthThreshold
int
The maximum length allowed for a field value before it is considered an error.
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – the offset of the first character of the record text from the start of the containing split.
If these names would collide with those defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
lineComment
String
The character sequence indicating a line comment. Lines beginning with this sequence are ignored.
missingFieldAction
How to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output.
parseErrorAction
How to handle all parsing errors.
parseOptions
The parsing options used by the reader.
pessimisticSplitting
boolean
Configures whether pessimistic file-splitting must be used. Default: disabled. Pessimistic splitting defines one file split per file (assumes the input files are not splittable).
readBuffer
int
The size of the I/O buffer, in bytes, to use for reads. The default size is 64K.
readOnClient
boolean
Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context.
recordSeparator
String
Value to use as a record separator.
recordWarningThreshold
int
The maximum number of records that can have parse warnings before failing.
replacement
String
Replacement string to use when encoding error policy is replacement. Default: '?'
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
schema
The record schema expected in the delimited text source.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
The configuration used in determining how to break the source into splits.
Ports
The ReadFixedText operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data read and parsed from the provided input data files (sources).
Using the ReadSource Operator to Read Generic Sources
The ReadSource operator reads a defined data source as a stream of records. The data source provides a sequence of bytes in some format that can be parsed into records that are assumed to be identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor, which must be provided.
This operator is low level, providing a generalized model for reading files in a distributed fashion. Typically, the ReadSource operator is not directly used in a graph, instead being indirectly used though a composite operator such as one derived from AbstractReader, providing a more appropriate interface to the end user.
Parallelized reads are implemented by breaking input files into independently parsed pieces, a process called splitting. Splits are then distributed to available partitions and parsed. When run on a distributed cluster, the reader makes an attempt to assign splits to machines where the I/O will be local, but non-local assignment may occur in order to provide work for all partitions. Distributed execution also makes an assumption that the specified data source is accessible from any machine. If this is not the case, the read operator must be made non-parallel by invoking the disableParallelism() method on the operator instance.
Not all formats support splitting; this generally requires a way of unambiguously identifying record boundaries. Formats will indicate whether they can support splitting. If not, each input file will be treated as a single split. Even with a non-splittable format, this means reading multiple files can be parallelized. Some formats can partially support splitting, but in a “optimistic” fashion; under most circumstances splits can be handled, but in some edge cases splitting leads to parse failures. For these cases, the reader supports a “pessimistic” mode that can be used to assume a format is non-splittable, regardless of what it reports.
The reader makes a best-effort attempt to validate the data source before execution but cannot always guarantee correctness, depending on the nature of the data source. This is done to try to prevent misconfigured graphs from executing, such as when the reader may not execute until a late phase where a failure may result in a significant amount of work being lost.
Tip:  This is a low-level operator that typically is not directly used. It can be used with a custom data format. A custom data format may be needed to support a format not provided by the DataFlow library.
Code Example
This example code fragment demonstrates how to set up a reader for a generic file type.
Using the ReadSource operator
ReadSource reader = new ReadSource();
reader.setSource(new BasicByteSource("filesource"));
reader.setFormat(new DelimitedTextFormat(TextRecord.convert(record(INT("intfield"), STRING("stringfield"))),
                   new FieldDelimiterSettings,
                   new CharsetEncoding()));
ParsingOptions options = new ParsingOptions();
options.setSelectedFields("stringfield");
reader.setParseOptions(options);
Properties
The ReadSource operator supports the following properties:
Name
Type
Description
format
The data format for the configured source.
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – the offset of the first character (or byte for binary formats) of the record from the start of the containing split.
If these names would collide with those defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
parseOptions
The parsing options used by the reader.
pessimisticSplitting
boolean
Configures whether pessimistic file splitting must be used. Default: disabled. Pessimistic splitting defines one file split per file (assumes the input files are not splittable).
readOnClient
boolean
Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
The configuration used in determining how to break the source into splits.
Ports
The ReadSource operator provides a single output port:
Name
Type
Get Method
Description
output
getOutput()
Data from the defined source parsed into records by the given data format.
Using the ReadLog Operator to Read Log Data
Many applications and systems produce log data that is loosely structured. Generally, there is a specific format used to write the log data; however, this format is not always unambiguously reversible to a typical parser. Also, different fields might use different field separators and delimiters.
About the only generalities that can be made about all log formats is that the records always contain an ID field, usually a timestamp, and a message field consisting of the information that produced the log event. In these cases the log data may not be able to be read by a regular delimited or fixed text reader.
The ReadLog operator reads a text file or alternative source consisting of log events from a particular application or system. The type of application or system producing the log records must be specified in advance through a property setting. The currently supported log types are enumerated by SupportedLogType. Configuring the operator requires the user to either provide one of these enumerations or their own implementation of a particular LogFormat. It should be noted that these settings are mutually exclusive.
In addition to specifying the log type the format pattern may be set. This is a String that provides information about a log format when customization of the format is allowed. It is specific to the type of log being read and therefore may provide more customization based on the logs being read. Additionally the newline character used by the log files may be specified if a nondefault newline character is being used. By default this is determined automatically by examining the first few lines in the source.
The record flow generated by this operator is determined by the log type being read and the log format pattern provided during composition of the operator unless otherwise noted.
Supported Log Types
The ReadLog operator supports a selection of common log formats. These are enumerated by SupportedLogType. Custom log formats can be added by implementing the LogFormat interface. A custom format would be instantiated and provided to the ReadLog operator through the logFormat property. Certain log types can also be manually instantiated and provided to the ReadLog operator when log-specific settings need to be changed, such as log4j’s logging levels.
The various supported log types are listed below.
Generic Log Data
The generic type can be used when the log data can be parsed using a regular expression but there is no dedicated format for the log. The schema is automatically generated by counting the number of groupings in the regular expression provided. The schema can also be set manually by creating a custom instance of the log format.
The generic format takes a valid Java regular expression string. The grouping of the regular expression defines the fields the individual records will be split into.
Default : "(.*)"
Example : "(\\d\\d.\\d+) (\\w+) (\\w+)"
Common Log Format
The CLF type can be used when reading a web server log in common log format. NCSA common log format is specified at http://www.w3.org/Daemon/User/Config/Logging.html#common-logfile-format.
Since CLF is well defined, it does not allow a format pattern to be specified.
Combined Log Format
The Combined type can be used when reading a web server log in combined log format. NCSA combined log format is specified at http://publib.boulder.ibm.com/tividd/td/ITWSA/ITWSA_info45/en_US/HTML/guide/c-logs.html#ncsa.
Combined takes a true or false string, which determines if the optional cookie field is included in the log.
Extended Log Format
The ELF type can be used when reading a web server log in extended log format. Extended log format is specified at http://www.w3.org/TR/WD-logfile.html.
ELF will accept a string in the same form as a Fields directive as specified in the official format. If format discovery is enabled, it will scan the file for any directives and apply them appropriately.
Example : "#Fields: date time cs-method cs-uri"
GlassFish Logs
The GlassFish format supports reading logs produced by GlassFish servers. The GlassFish server log format is specified at http://docs.oracle.com/cd/E18930_01/html/821-2416/abluk.html.
The format pattern supported by the GlassFish format consists of a string that specifies the date format used in the timestamps of the log. Any string supported by Java’s DateFormat class is acceptable.
Default : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
Example : "dd-MM-yyyy HH:mm:ss"
Log4j Logs
The log4j format supports reading logs produced by the Apache log4j library for Java. More information about the library can be found at http://logging.apache.org/log4j/1.2/.
The log4j format will accept a string in the same form as the conversion pattern that specifies the logging. More information about log4j conversion patterns can be found at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/EnhancedPatternLayout.html.
Default : "%r [%t] %-5p %c %x - %m%n"
Example : "%d{ISO8601} %p %c: %m%n"
Syslog Logs
The syslog format supports reading logs produced by syslogd and other BSD-compliant syslog producers. The BSD syslog format is specified by RFC-3164.
The format pattern supported by the syslog format consists of a string that includes the current four-digit year and the signed four-digit offset from UTC separated with a single space.
Default : current year and timezone
Example : "2012 -0600"
Code Examples
This example code fragment demonstrates how to set up a reader for a log4j log file.
Using the ReadLog operator
ReadLog reader = graph.add(new ReadLog(data/log4jdata.log));
reader.setLogType(SupportedLogType.LOG4J);
reader.setLogPattern("%d{ISO8601} %p %c: %m%n");
reader.setNewline("\n");
Using the ReadLog operator in RushScript
var data = dr.readLog({source:'data/log4jdata.log', logType:'LOG4J', logPattern:'%d{ISO8601} %p %c: %m%n', newLine:'\n');
Properties
The ReadLog operator supports the following properties:
Name
Type
Description
autoDiscoverFormat
boolean
Determines if additional format information should be acquired by scanning the source. Only certain formats can acquire additional metadata through this process. Default: enabled.
autoDiscoverNewline
boolean
Determines if the newline characters should be auto-discovered. Default: enabled.
charset
Charset
The character set used by the data source. Default: ISO-8859-1.
charsetName
String
The character set used by the data source by name.
decodeBuffer
int
The size of the buffer, in bytes, used to decode character data. By default, this will be automatically derived using the character set and read buffer size.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the input data into the configured character set. The default action is to replace the faulty data with a replacement character.
extraFieldAction
How to handle fields found when parsing the record, but not declared in the schema.
fieldErrorAction
How to handle fields which cannot be parsed.
fieldLengthThreshold
int
The maximum length allowed for a field value before it is considered an error.
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – the offset of the first character of the record text from the start of the containing split.
If these names would collide with those defined in the source schema, they will be renamed so as to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
logFormat
The type of log this operator will read. This can be used when support for custom log formats is needed or additional settings are required.
logPattern
String
The format pattern to use when reading the logs. This is specific to the type of log being read.
logType
The type of log this operator will read. This property is mutually exclusive with logFormat.
missingFieldAction
How to handle fields declared in the schema but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output.
newline
String
The newline character used in the logs. Setting this will disable the autoDiscoverNewline property.
parseErrorAction
How to handle all parsing errors.
parseOptions
The parsing options used by the reader.
pessimisticSplitting
boolean
Configures whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one split per file (assumes the input files are not splittable).
readBuffer
int
The size of the I/O buffer, in bytes, to use for reads. Default: 64K.
readOnClient
boolean
Determines whether reads are performed locally or distributed in the cluster. By default, reads are performed in the cluster if executed in a distributed context.
recordWarningThreshold
int
The maximum number of records that can have parse warnings before failing.
replacement
String
Replacement string to use when encoding error policy is replacement. Default: '?'
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
source
ByteSource, Path, or String
Source of the input data to parse.
splitOptions
The configuration used in determining how to break the source into splits.
Ports
The ReadLog operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data read and parsed from the provided input log files (sources).
Using the ReadARFF Operator to Read Sparse Data
This topic describes the DataFlow ARFF read operator. For information on its KNIME node, see ARFF Reader.
Sparse data is useful for data sets that contain a large number of fields where most of the fields do not have data values. This is mostly the case with numeric data, but can also be applied to enumerated data types. A common example is a data set that contains a row-per-website user with a field-per-website page. Each field contains a count of the number of times a user has visited the specific page. Most users will visit only a fraction of the overall pages on the website. Using a sparse data representation will allow the data set to be much smaller in size than a fully populated, dense data set.
DataFlow supports sparse data using the Attribute-Relation File Format (ARFF). The ReadARFF operator is used to read sparse data stored in ARFF. Files using ARFF can be in either sparse or dense mode. This reader detects the mode and reads the data accordingly. ARFF files contain schema information. The schema is parsed and used to determine how to parse data records.
ARFF can be parsed in parallel under “optimistic” assumptions: namely, that parse splits do not occur in the middle of a delimited field value and somewhere before an escaped record separator. This is assumed by default, but can be disabled with an accompanying reduction of scalability and performance.
ARFF data is used by DataFlow to represent sparse data. But it can also be used to store dense data in a CSV style. The ARFF mode determines which format to use: sparse or dense. The reader automatically discovers the mode.
The ARFF metadata also contains two other data values: the relation name and comments. The relation name is specified as one of the metadata headers. Comments are lines that start with the "%" character. Comments are returned as a list of String values.
Data can be written in ARFF Using the WriteARFF Operator to Write Sparse Data.
Code Examples
Since ARFF includes metadata that contains field names and types, the schema for ARFF files does not have to be specified. The metadata can be accessed using the discoverMetadata() method on the reader after the data source has been configured. The metadata can be used to access the relation name, comments, ARFF mode, and data schema. The schema contains the field names and types along with patterns for parsing and formatting field values.
Using the ReadARFF operator in Java
// Create ARFF reader
ReadARFF reader = graph.add(new ReadARFF("data/weather.arff"));

// Get metadata for the configured data source
Analysis metadata = reader.discoverMetadata(FileClient.basicClient());
ARFFMode mode = metadata.getMode();
String relationName = metadata.getRelationName();
List<String> comments = metadata.getComments();
TextRecord schema = metadata.getSchema();

// Dump out metadata values
System.out.println("mode = " + mode);
System.out.println("relationName = " + relationName);
System.out.println("comments = " + comments);
System.out.println("schema = " + schema.getFieldNames());
Following is a snippet of output from running an application with the previous code fragment.
mode = DENSE
relationName = weather
comments = []
schema = [outlook, temperature, humidity, windy, play]
Using the ReadARFF operator in RushScript
var data = dr.readARFF(source:'data/weather.arff');
The weather.arff file’s contents:
@relation weather

@attribute outlook {sunny, overcast, rainy}
@attribute temperature real
@attribute humidity real
@attribute windy {TRUE, FALSE}
@attribute play {yes, no}

@data
sunny,85,85,FALSE,no
sunny,80,90,TRUE,no
overcast,83,86,FALSE,yes
rainy,70,96,FALSE,yes
rainy,68,80,FALSE,yes
rainy,65,70,TRUE,no
overcast,64,65,TRUE,yes
sunny,72,95,FALSE,no
sunny,69,70,FALSE,yes
rainy,75,80,FALSE,yes
sunny,75,70,TRUE,yes
overcast,72,90,TRUE,yes
overcast,81,75,FALSE,yes
rainy,71,91,TRUE,no
Properties
The ReadARFF operator supports the following properties:
Name
Type
Description
charset
Charset
The character set used by the data source. Default: ISO-8859-1.
charsetName
String
The character set used by the data source by name.
decodeBuffer
int
The size of the buffer, in bytes, used to decode character data. By default, this will be automatically derived using the character set and read buffer size.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the input data into the configured character set. The default action is to replace the faulty data with a replacement character.
fieldDelimiter
char
The ARFF format supports only a single quote or a double quote as the field delimiter. Default: single quote (').
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – the offset of the first character of the record text from the start of the containing split.
If these names would collide with those defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
readBuffer
int
The size of the I/O buffer, in bytes, to use for reads. Default: 64K.
replacement
String
Replacement string to use when encoding error policy is replacement. Default: '?'
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
splitOptions
The configuration used in determining how to break the source into splits.
Ports
The ReadARFF operator provides one output port:
Name
Type
Get Method
Description
output
getOutput()
Output port containing the data read from the ARFF source.
Using the ReadStagingDataset Operator to Read Staging Datasets
Staging data sets are used within DataFlow for writing intermediate data into a fast and efficient binary format. They are also very convenient since metadata is stored in a header section of the data set. When reading a data set, the metadata is used to determine the fields and types of data contained within the data set.
Text files can be used for intermediate data access also but incur the overhead of formatting and parsing. Text files also can introduce data errors due to having to convert numeric data into textual formats. Staging data sets do not require the round trip to and from text and so bypass these issues.
The DataFlow framework uses staging data sets in the following situations:
In between application phases where one phase feeds data to the next
Staging data to disk during merge sort operations
Staging data to disk due to queue overflow in deadlock situations
There are several use cases for directly using staging data sets. A few examples are:
A particular task that needs to be accomplished must be broken into several DataFlow applications. Data from each application must be passed to the next. Staging data sets can be used to capture the intermediate data, allowing for quick and easy access.
Data within a database is being used to build a predictive model. This development will require many iterations of algorithm testing with the data. For this use case, the most efficient route is to first read the data from the source database and write it to any staging data sets. Then use the staged data as input into the predictive analytic development. The one-time read from the database is overhead. However, given that the source data will be read many times, the savings are substantial.
As with other file-based operators, the ReadStagingDataset operator accepts several data sources:
A ByteSource reference
A Path reference
A String reference containing:
The path to a single data set
A path to a directory containing many data sets with the same format and schema
A path with wildcard characters within the file name that will be resolved to the matching data set files
Staging data sets can be written Using the WriteStagingDataset Operator to Write Staging Data Sets.
Code Examples
This code example constructs a new ReadStagingDataset operator by passing in the name of the file to read as a constructor parameter. Properties for setting the I/O buffer size and the selected fields are then set. The selected fields must contain field names that exist in the data set. Use the selectedFields property to constrain the fields read from the data set and provided to the output port of the operator.
Using the ReadStagingDataset operator in Java
ReadStagingDataset reader = graph.add(new ReadStagingDataset("results/ratings-stage"));
reader.setReadBuffer(128 * 1024);
reader.setSelectedFields(Arrays.asList(new String[] {"userID", "rating"}));
Using the ReadStagingDataset operator in RushScript
var data = dr.readStagingDataset({source:'results/ratings-stage', readBuffer:131072});
The operator also supports a static method for reading the metadata of a staging data set. The following code example depicts how to read and use the metadata.
Using staging dataset metadata
ReadStagingDataset reader = graph.add(new ReadStagingDataset("results/ratings-stage"));
reader.setReadBuffer(128 * 1024);
reader.setSelectedFields(Arrays.asList(new String[] {"userID", "rating"}));

DatasetMetadata metadata = reader.discoverMetadata();
long rowCount = metadata.getRowCount();
int blockSize = metadata.getBlockSize();
TokenType datasetType = metadata.getSchema();
DatasetStorageFormat format = metadata.getStorageFormat();

System.out.println("rowCount = " + rowCount);
System.out.println("blockSize = " + blockSize);
System.out.println("datasetType = " + datasetType);
System.out.println("format = " + format);
The output of the metadata information from the previous application is shown following. This metadata can be used within an application as needed.
rowCount = 1000
blockSize = 64
datasetType = {"type":"record","representation":"DENSE_BASE_NULL","fields":[{"userID":{"type":"int"}},{"movieID":{"type":"int"}},{"rating":{"type":"int"}},{"timestamp":{"type":"string"}}]}
format = COMPACT_ROW
Properties
The ReadStagingDataset operator supports the following properties:
Name
Type
Description
includeSourceInfo
boolean
Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields:
sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL.
splitOffset – the offset of the starting byte of the containing split in the source data.
recordOffset – a unique identifier for the record which preserves the relative ordering of records, approximating the byte offset of the encoded record in the source split.
If these names would collide with those defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
pessimisticSplitting
boolean
Configures whether pessimistic file splitting must be used. Default: disabled. Pessimistic splitting defines one file split per file (assumes the input files are not splittable).
readBuffer
int
The size of the I/O buffer, in bytes, to use for reads. Default: 64K.
readOnClient
boolean
Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster, if executed in a distributed context.
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
source
ByteSource, Path, or String
Source of the input data to parse as delimited text.
Ports
The ReadStagingDataset operator provides a single output port:
Name
Type
Get method
Description
output
getOutput()
Provides the record data read and parsed from the provided input data files (sources).
Using the ParseTextFields Operator to Parse Text Records
The ParseTextFields operator is similar to the ReadDelimitedText and other text-based read operators. However, instead of reading from a source of text, it takes a flow of records consisting of text fields as input. These input strings are then parsed into other value types as specified by the schema provided to the operator. Additionally, it emits a flow of records which failed parsing, allowing remediation to be performed on invalid records.
The parsed output will have the type specified by the schema. Output fields will contain the result of processing the input field of the same name according to the type information in the provided schema. Referenced input fields must either be string valued, in which case they are parsed according to the schema, or of a type which is assignable to the output field's type, in which case they are copied as-is.
If a field is present in the schema, but not in the input, the output field is NULL. If an input value is NULL, the resulting output field is NULL. Input fields without a matching field in the schema are dropped.
The rejected output has the same type as the input. Field values will match those of the failed input record.
ParseTextFields only performs semantic validation of the input; the input must have been already broken into field values. This can be accomplished by using ReadDelimitedText with a schema consisting of all string fields or by custom operators. It should also be noted that a special schema discoverer TextRecord.TEXT_FIELD_DISCOVER is provided for when the schema of the source field may not be known.
Code Examples
Following is an example usage of the parser. In this case, notice that the input field acctNumber is not defined in the schema, so is absent from the outputs. Conversely, acctCodes is present in the schema, but not in the input; it is present in the output, but will be NULL for every record. Additionally note that the input fields that are in the schema appear in a different order.
Using ParseTextFields in Java
// Define a "raw" schema, identifying fields but not parsing content
RecordTokenType rawType=record(STRING("acctNumber"),STRING("company"),
                               STRING("startDate"),STRING("balance"));
TextRecord rawSchema= TextRecord.convert(rawType, StringConversion.RAW);

// Read "raw" fields
ReadDelimitedText reader= graph.add(new ReadDelimitedText(...));
reader.setSchema(rawSchema);

// Define the parsing schema
TextRecord schema =
  SchemaBuilder.define(
            SchemaBuilder.STRING("company"),
            SchemaBuilder.STRING("acctCode"),
            SchemaBuilder.DOUBLE("balance"),
            SchemaBuilder.DATE("startDate", "MM/dd/yyyy") // specify pattern for parsing the date
            );
        
// Parse the text records
ParseTextFields parser= graph.add(new ParseTextFields());
parser.setSchema(schema);
graph.connect(reader.getOutput(), parser.getInput());
Using ParseTextFields in RushScript
var rawschema= dr.schema()
    .nullable(false).trimmed(false)  // Want strings in raw form
    .STRING("acctNumber")
    .STRING("company")
    .STRING("startDate")
    .STRING("balance");

var rawdata= dr.readDelimitedText({source:..., schema:rawschema});


var schema = dr.schema()
    .STRING('company')
    .STRING('acctCodes')
    .DOUBLE('balance')
    .DATE('startDate','MM/dd/yyyy');

var parsed= dr.parseTextFields(rawdata, {schema:movieschema});
To better illustrate the behavior of the previous code, consider the following source data (all fields are string values).
acctNumber
company
startDate
balance
"1"
"Acme Corporation"
"9/17/1949"
"1500.00"
"2"
"Spacely Sprockets"
"2062-01-01"
"1000.00"
"3"
"Duff Beer"
"12/17/1989"
NULL
This yields the data below on the output port. Here values are parsed as the appropriate type for the field. As the balance field is NULL in the source, it is NULL in the output.
company
acctCodes
balance
startDate
"Acme Corporation"
NULL
1500.00
1949-09-17
"Duff Beer"
NULL
NULL
1989-12-17
Meanwhile, the rejects port will produce the following data, as the startDate field has the wrong format (all fields are string values).
acctNumber
company
startDate
balance
"2"
"Spacely Sprockets"
"2062-01-01"
"1000.00"
Properties
The ParseTextFields operator supports the following properties.
Name
Type
Description
schema
The record schema expected for the input and required for the output. Schema fields will be matched to input fields by name.
Ports
The ParseTextFields operator provides two output ports.
Name
Type
Get method
Description
output
getOutput()
Provides the successfully parsed input records.
rejects
getRejects()
Provides the input records which failed parsing.
Using the ReadJSON Operator to Read JSON Text
This topic describes the DataFlow JSON read operator. For information about its KNIME node, see JSON Reader.
The ReadJSON operator reads a JSON file containing key-value pairs or array of objects as record tokens. It supports JSON Lines format as described at http://jsonlines.org/. The formatted text in JSON Lines contains a single JSON record per line. Each record is separated by a newline character.
In JSON, all field keys must start and end with a delimiter. A "(double quote) is typically used as the field delimiter. However, you may enable the allowSingleQuotes property to avoid parsing errors when single quotes are used. The ReadJSON operator uses the Jackson JSON parsing library to parse fields.
The reader may optionally specify RecordTextSchema to provide parsing and type information for fields. The schema, in conjunction with any specified field filter, defines the output type of the reader. This can be manually constructed using the provided DataFlow API.
The StructuredSchemaReader class provides support for reading the DataConnect structured schema descriptors (.schema files) to use with readers. Also, automated schema discovery can be performed based on the contents of the file because JSON text has explicit field markers. The reader provides a pluggable discovery mechanism to support this function. By default, the schema is automatically discovered with the initial assumption that all the fields are strings. The discovered fields are named using the available key fields.
Normally, the reader output file includes all the parsed records with and without parsing errors. The fields that cannot be parsed have null values in the output. If required, the reader can be configured to filter the failed records from the output.
JSON text does not contain a header row since the keys in a JSON record define the fields in the output. JSON text files can be parsed in parallel with the "optimistic" assumption that the data is well formatted as per the JSON Lines standard.
Code Examples
The first code example shows a simple usage of the reader. The path to the local file name is provided as a parameter to the constructor. This can also be set using the setSource() method. A record type is built and used as the input schema. The record type must be converted to an acceptable schema before it is used by the reader.
Using the ReadJSON operator in Java
The following code shows how to use the ReadJSON operator in Java:
ReadJSON reader = graph.add(new ReadJSON("data/iris.jsonl"));
RecordTokenType schema = record(DOUBLE("sepallength"), DOUBLE("sepalwidth"), DOUBLE("petallength"), DOUBLE("petalwidth"), STRING("class"));
reader.setSchema(TextRecord.convert(schema));
Using ReadJSON operator in RushScript
The following code shows how to use the ReadJSON operator in RushScript:
var irisSchema = dr.schema().DOUBLE("sepallength"), DOUBLE("sepalwidth"), DOUBLE("petallength"), DOUBLE("petalwidth"), STRING("class");
var data = dr.readjson({source:'data/iris.jsonl ', schema: irisSchema});
The following data snippet is from the iris.jsonl file and can be read using the preceding code examples (either Java or RushScript).
{"sepallength":5.1,"sepalwidth":3.5,"petallength":1.4,"petalwidth":0.2,"class":"Iris-setosa"}
{"sepallength":4.9,"sepalwidth":3,"petallength":1.4,"petalwidth":0.2,"class":"Iris-setosa"}
{"sepallength":4.7,"sepalwidth":3.2,"petallength":1.3,"petalwidth":0.2,"class":"Iris-setosa"}
{"sepallength":7,"sepalwidth":3.2,"petallength":4.7,"petalwidth":1.4,"class":"Iris-versicolor"}
{"sepallength":6.4,"sepalwidth":3.2,"petallength":4.5,"petalwidth":1.5,"class":"Iris-versicolor"}
{"sepallength":6.9,"sepalwidth":3.1,"petallength":4.9,"petalwidth":1.5,"class":"Iris-versicolor"}
{"sepallength":6.3,"sepalwidth":3.3,"petallength":6,"petalwidth":2.5,"class":"Iris-virginica"}
{"sepallength":5.8,"sepalwidth":2.7,"petallength":5.1,"petalwidth":1.9,"class":"Iris-virginica"}
{"sepallength":7.1,"sepalwidth":3,"petallength":5.9,"petalwidth":2.1,"class":"Iris-virginica"}
{"sepallength":6.3,"sepalwidth":2.9,"petallength":5.6,"petalwidth":1.8,"class":"Iris-virginica"}
Properties
The ReadJSON operator supports the following properties.
Name
Type
Description
allowComments
Boolean
Determines whether the parser will allow using Java or C++ style comments (both '/'+'*' and '//' types) within parsed content or not. Default is False.
allowUnquotedFieldNames
Boolean
Determines whether the parser will allow using unquoted field names. Default is False.
allowSingleQuotes
Boolean
Determines whether the parser will allow using single quotes (apostrophe, character '\'') for quoting strings. Default is False.
allowUnquotedControlChars
Boolean
Determines whether the parser will allow JSON strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed). Default is False.
allowBackslashEscapingAny
Boolean
Determines whether the parser will allow quoting any character using backslash quoting mechanism. If it is not enabled, only characters that are explicitly listed by JSON specification can be escaped. Default is False.
allowNumericLeadingZeros
Boolean
Determines whether the parser will allow numbers to start with additional leading zeros. If the leading zeros are allowed for numbers in source, then this field must be set to True. Default is False.
allowNonNumericNumbers
Boolean
Determines whether the parser is allowed to recognize "Not a Number" (NaN) token as legal floating point values. Default is False.
analysisDepth
Int
Indicates the number of characters to read for performing schema discovery and structural analysis. Default is False.
charset
Charset
Indicates the character set used by the data source. Default is UTF-8.
charsetName
String
Indicates the character set used by the data source based on the name.
decodeBuffer
Int
Indicates the buffer size (in bytes) used to decode character data. By default, this is automatically derived using the character set and read buffer size.
discoveryNullIndicator
Int
Indicates the text value used to represent null values by default in discovered schemas. By default, this is the empty string.
discoveryStringHandling
StringConversion
Indicates the default behavior for processing string-valued types in discovered schemas.
encoding
Indicates the properties that control character set encoding.
errorAction
CodingErrorAction
Determines the action to be performed for errors encoding the input data into the configured character set. The default action replaces the faulty data with a replacement character.
extraFieldAction
Determines the action to be performed for fields that are found when parsing the record, but not declared in the schema.
fieldErrorAction
Determines the action to be performed for fields that cannot be parsed.
fieldLengthThreshold
Int
Indicates the maximum length allowed for a field value before it is considered an error.
includeSourceInfo
Boolean
Determines whether output records will include additional fields that provides origin information for the record. If true, records will have three additional fields:
sourcePath - Path of the file from which the record originates. If this is not known, the value is Null.
SplitOffset - Offset of the starting byte of the containing split in the source data.
recordOffset - Offset of the first character of the record text from the start of the containing split.
If these names collide with the names defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property.
missingFieldAction
Indicates how to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, then the missing fields will be null-valued in the output.
multilineFormat
 
Determines whether the file should be parsed as a multiline JSON file, which allows each JSON record to span multiple lines. Otherwise the data must be in JSON lines format. Default is False.
parseErrorAction
Indicates the action to be performed for all parsing errors.
parseOptions
Indicates the parsing options used by the reader.
pessimisticSplitting
Boolean
Configures whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file (assumes the input files cannot be split).
readBuffer
Int
Indicates the size of the I/O buffer (in bytes) to use for reads. Default is 64K.
readOnClient
Boolean
Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context.
recordWarningThreshold
Int
Indicates the maximum number of records that can have parse warnings before it fails.
replacement
String
Indicates the replacement string to be used when encoding error policy is replacement. Default is '?'.
selectedFields
List<String>
 
Indicates the list of input fields to be included in the output. This sets a limit for the fields written to the output.
schema
Indicates the record schema expected in the delimited text source. This property is mutually exclusive with the schemaDiscovery property. If either of the two property is set, then the other property is ignored. By default, this property is not set.
schemaDiscovery
Indicates the schema discovery mechanism to be used. This property is mutually exclusive with the schema property. If either of the two property is set, then the other property is ignored. By default, a pattern-based mechanism is used. Providing a list of pattern/type pairs uses the default discoverer extended with the supplied patterns.
source
Indicates the source of the input data to be parsed as delimited text.
splitOptions
Indicates the configuration used to determine how to break the source into splits.
Ports
The ReadJSON operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the record data to read and parse from the provided input data files (sources).
Write Operators
Using the WriteAvro Operator to Write Apache Avro Data
Using the WriteToJDBC Operator to Write to Databases
Using the DeleteFromJDBC Operator to Write Database Deletes
Using the UpdateInJDBC Operator to Write Database Updates
Using the WriteDelimitedText Operator to Write Delimited Text
Using the WriteFixedText Operator to Write Fixed-width Text
Using the WriteSink Operator to Write Generic Targets
Using the WriteStagingDataset Operator to Write Staging Data Sets
Using the WriteARFF Operator to Write Sparse Data
Using the ForceRecordStaging Operator to Explicitly Stage Data
Using the WriteORC Operator to Write Apache ORC Files
Using the WriteAvro Operator to Write Apache Avro Data
The WriteAvro operator writes a stream of records in Apache Avro format. Avro is a commonly used binary format offering data compression and the ability to be parsed in parallel. The writer must provide an encoding schema which is serialized with the data, providing the reader with sufficient metadata to decode it.
The operator can be used to write data with a predefined schema or, if one is not provided, generate a schema automatically. When appending to an existing file, the schema encoded in the target is used, ignoring any that were set on the operator itself.
Input fields are mapped to fields in the output schema by name. Any input fields not mapped to an output field will be omitted from the output. Any output fields not mapped to an input field will be null-valued; if this is the case and the field schema does not allow null values—that is, it is not a UNION involving the NULL schema type, the writer will raise an error.
Input mappings must be to a compatible Avro schema type. An error will be raised if a field has an incompatible type. Furthermore, if a field schema does not allow null values, but the source field takes on a null value during execution—the writer will fail with an error. Valid mappings are described in the following table:
Target Avro Type
Valid DataFlow Types
BOOLEAN
BOOLEAN
BYTES
BINARY
FIXED
BINARY; values will be checked at execution to ensure the correct length, failing if they are not
DOUBLE
NUMERIC (with possible loss of precision), DOUBLE, FLOAT, LONG, INT
FLOAT
FLOAT, LONG, INT
LONG
LONG, INT
INT
INT
STRING
STRING
ENUM
STRING; values will be checked at execution to ensure they are valid symbols, failing if they are not
The remaining Avro schema types are handled as follows:
RECORD can only be used as the top-level schema provided to the writer; nested records are not allowed. However, special record types representing the DataFlow DATE, TIME, and TIMESTAMP times are used in mapping generated schemas.
UNION can be used only if one of the component schemas has a valid mapping.
ARRAY and MAP are unsupported.
When no schema is provided, either from the target or on the operator, one will be generated based on the input data. The resulting schema will have the same field names and ordering as the input record.
Because DataFlow allows a greater range of field identifiers than Avro, a cleansing process must be applied to field names when generating a schema so that the resulting fields are valid. Names are cleansed by replacing any characters which are not alphanumeric or underscore ('_') with the underscore character. If the resulting name does not start with an underscore or alphabetic character, an underscore is prepended to the name. For example, a field named "field#1" is mapped to "field_1", a field named "#1 field" to "_1_field", and a field named "1st field" to "_1st_field". Should the cleansing process yield the same name for two different fields, schema generation will procude an error.
As all DataFlow values are nullable, the generated schema for each field is always a UNION involving NULL plus one other type as indicated by the source field type:
DOUBLE, FLOAT, LONG, and INT fields are mapped to the primitive Avro types of the same name.
STRING fields are mapped differently depending on whether they have a FieldDomain associated with them. If they have no domain, they are mapped to the STRING primitive type. If they have a domain, they are mapped to an Avro ENUM type with symbols matching the valid domain.
BINARY fields are mapped to the BYTES primitive type.
NUMERIC fields are mapped to the DOUBLE primitive type, with possible loss of precision.
CHAR fields are mapped to the STRING primitive type.
DATE fields are mapped to a nested RECORD type consisting of a single LONG field named epochDays.
TIME fields are mapped to a nested RECORD type consisting of a single INT field named dayMillis.
TIMESTAMP fields are mapped to a nested RECORD type consisting of three fields: a LONG field named epochSecs, an INT field named subsecNanos, and another INT field named offsetSecs.
As with all file-based write operators, the WriteAvro operator will treat its sink as a directory by default. A file-per-application data stream will be output in this case. This can be overridden using the writeSingleSink property.
When writing Avro data, the metadata containing information about the ordering and partitioning of the data may optionally be saved with the data. This can provide a performance improvement when reading the data later since the data may not need to be re-sorted or partitioned again to ensure it meets various requirements of the graph.
Code Examples
The following code example demonstrates using the WriteAvro operator to write a set of data files, generating a default schema.
Using the WriteAvro operator in Java
// Construct the writer with the path where the results are to be written.
// Use OVERWRITE mode indicating the output file should be overwritten if it exists already.
WriteAvro writer = graph.add(
                new WriteAvro("results/test-ratings", WriteMode.OVERWRITE));
        
// Connect the output of the reader to the input of the writer
graph.connect(operator.getOutput(), writer.getInput());
The following example demonstrates using the operator in RushScript, providing the schema as a file (ratings.avrs) containing the JSON serialization of an Avro Schema object.
Using the WriteAvro operator in RushScript
// Note that this operator doesn't return anything
dr.writeAvro(data, {target:'results/test-ratings', mode:'OVERWRITE', schema:'ratings.avrs'});
Properties
The WriteAvro operator supports the following properties:
Name
Type
Description
compression
The compression method used when writing the Avro file. Default: NONE.
formatOptions
The formatting options used by the reader. This sets all format options at once.
mode
Determines how the writer should handle an existing target.
saveMetadata
boolean
Whether the writer should save metadata information about the ordering and distribution of the data. Default: false.
schema
String or org.apache.avro.Schema
The schema to use when writing the Avro file. If a String is provided, it is interpreted as the name of a file containing a JSON serialization of the Schema.
target
ByteSink, Path, or String
The sink to write. Path and String parameters are converted to sinks. If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.
writeBuffer
int
The size of the I/O buffer, in bytes, to use for writes. Default: 64K.
writeSingleSink
boolean
Whether the writer should produce a single output file or multiple ones. By default, an output file will be produced for each partition if the target sink supports this.
writeOnClient
boolean
Determines whether writes are performed by the local client or in the cluster. This option also disables the parallelism during the write, producing a merge of the data on the local client. By default, writes are performed in the cluster if executed in a distributed context.
Ports
The WriteAvro operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to the provided sink in delimited text format.
Using the WriteToJDBC Operator to Write to Databases
In its simplest form the WriteToJDBC operator writes records from an input port to a target table in a database using a JDBC driver. Data from the input data source will be added to the target table using database insert statements. Constraints may be applied by the database.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a jar file that contains the JDBC driver and its dependencies.
Note that the WriteToJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
Within Java, create an OperatorSettings instance with the maximum parallelism level set to the wanted level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added. See the code example below.
Within RushScript, set the maxParallelism level property within the property settings of the operator instance. See the code example below.
To read from database tables using JDBC, see Using the ReadFromJDBC Operator to Read Databases.
Code Examples
The following example reads data from a local file and writes the data into a table in MySQL.
Using the WriteToJDBC Operator in Java to append data to a table
// Create an operator settings specifying the maximum number of parallel instances.
// This is optional. It limits the number of database connections made.
OperatorSettings settings = OperatorSettings.MAX_PARALLELISM(8);

// Create the JDBC writer
WriteToJDBC writer = graph.add(new WriteToJDBC(), settings);
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("test");
writer.setPassword("test");

// Create mapping to database columns
Map<String, String> mappings= new LinkedHashMap<String, String>();
mappings.put("r_user", "user");
mappings.put("r_movieid", "movieid");
mappings.put("r_rating", "rating");
mappings.put("r_timestamp", "timestamp");
writer.setRenameMapping(mappings);

// Have to specify the table name using the JDBC writer
writer.setTableName("ratings");

// Set the SQL statement use to initialize the target table (as needed).
writer.setInitializeTableSQL(
          "create table if not exists ratings (r_user integer, r_movieid integer, r_rating integer, r_timestamp varchar(20))");

writer.setSqlWarningLimit(20);
Notes on the code example:
The target table name is specified. When using WriteToJDBC the table name property is required.
The SQL statement used to initialize the target table is specified. The SQL syntax may vary by database vendor.
Using the WriteToJDBC Operator in RushScript to append data to a table
// The SQL statement used to initialize the table
var initSQL = 'create table if not exists ratings (r_user integer, r_movieid integer, r_rating integer, r_timestamp varchar(20))';

// The database field remapping to use
var mapping = {
   "r_user":"user", 
   "r_movieid":"movieid", 
   "r_rating":"rating", 
   "r_timestamp":"timestamp"
};
  
//  Create the write to JDBC operator and set properties
dr.writeToJDBC(data, {
    driverName:'com.mysql.jdbc.Driver', 
    url:'jdbc:mysql://dbserver:3306/test', 
    user:'test', 
    password:'test', 
    tableName:'ratings', 
    initializeTableSQL:initSQL, 
    renameMapping:mapping, 
    sqlWarningLimit:20},
    maxParallelism:8);    // optionally set the max number of parallel instances wanted
Properties
The WriteToJDBC operator supports the following properties.
Name
Type
Description
commitInterval
int
The commit interval used by the JDBC driver. This is the number of operations to execute between commit points. Generally a larger commit interval provides better performance.
connectionFactory
The connection factory to use for acquiring connections. A default implementation is provided. A specialized factory can be provided as needed.
driverName
String
The class name of the JDBC driver to use, for example: "sun.jdbc.odbc.JdbcOdbcDriver".
errorAction
The action to take if a SQL error occurs while reading data from the database.
finalizeTableSQL
String
The SQL statement to execute after processing all the records. For example, this could be used to execute a SQL CREATE INDEX statement.
initializeTableSQL
String
The SQL statement to execute before processing any records. Usually contains a CREATE TABLE SQL statement to recreate the table.
isolationLevel
int
The isolation level to use for the transactions. Reference the JavaDoc for the java.sql.Connection.setTransactionIsolation method for isolation level explanations.
keyNames
String[]
The field names (JDBC column names) to use as keys.
password
String
The password for connecting to the database.
renameMapping
Map<String,String>
A map used to convert flow names to table column names. This should be an ordered (that is, LinkedHashMap) mapping of names. The keys in the map represent the original names in the record port. The values in the map represent column names in the table. Any fields not included in the mapping will be dropped.
sqlWarningLimit
int
The maximum number of warnings regarding SQL errors that an operator can emit. This property is only used if the errorAction property is set to ErrorAction.WARN.
tableName
String
The name of the database table to write.
url
String
The URL specifying the instance of the database to connect with. See the documentation for the database for formatting information.
user
String
The user name for connecting to the database.
Ports
The WriteToJDBC operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to the configured database.
Using the DeleteFromJDBC Operator to Write Database Deletes
The DeleteFromJDBC operator deletes rows from a target table using the records from an input port to determine which rows to delete. When deleting rows the key fields must be specified in the operator properties. The key field values will be used within the generated database delete statement to determine which rows qualify for deletion.
This operator provides an output port that will include the data used by the delete operation and an additional column containing the status codes returned by the database for each delete statement.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a jar file that contains the JDBC driver and its dependencies.
Note that the DeleteFromJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
Within Java, create an OperatorSettings instance with the maximum parallelism level set to the desired level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added.
Within RushScript, set the maxParallelism level property within the property settings of the operator instance.
Code Examples
The following example is used to delete rows from a database table. The input data to the operator must contain the key values required to remove data from the target table. This example also demonstrates collecting the output so that the status codes can be recorded.
Using the DeleteFromJDBC operator in Java to delete rows from a table
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.jdbc.OutputMode;
import com.pervasive.datarush.operators.io.jdbc.DeleteFromJDBC;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

public class WriteMySQL {
    public static void main(String[] args) {

        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("WriteDelete");
        
        // Create a delimited text reader for the "ratings.txt" file
        ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
        reader.setFieldSeparator("::");
        reader.setHeader(true);
        RecordTokenType ratingsType = record(INT("r_user"), INT("r_movieid"), INT("r_rating"), STRING("r_timestamp"));
        reader.setSchema(TextRecord.convert(ratingsType));
        
        // Create JDBC writer
        DeleteFromJDBC writer = graph.add(new DeleteFromJDBC());
        writer.setDriverName("com.mysql.jdbc.Driver");
        writer.setUrl("jdbc:mysql://dbserver:3306/test");
        writer.setUser("root");
        writer.setPassword("localadmin");
        writer.setTableName("ratings");
        
        // You have to specify the key field names.
        writer.setKeyNames(new String[] {"r_user"});
        writer.setSqlWarningLimit(20);
        
        // Create a delimited text writer to collect the status codes
      WriteDelimitedText logger = app.add(new WriteDelimitedText("ratings_delete_status.txt", WriteMode.OVERWRITE));
        logger.setWriteSingleSink(true);
        logger.setHeader(true);
      
      // Connect operators
        graph.connect(reader.getOutput(), writer.getInput());
      graph.connect(writer.getOutput(), logger.getInput());
        
        // Compile and run the graph
        graph.run();
    }
}
Using the DeleteFromJDBC Operator in RushScript to delete data from a table
//  Create the delete from JDBC operator and set properties
dr.deleteFromJDBC(data, {
    driverName:'com.mysql.jdbc.Driver', 
    url:'jdbc:mysql://dbserver:3306/test', 
    user:'root', 
    password:'localadmin', 
    tableName:'ratings',
    keyNames:["r_user"],  
    sqlWarningLimit:20},
    maxParallelism:8);    // optionally set the max number of parallel instances wanted
Properties
The DeleteFromJDBC operator supports the following properties.
Name
Type
Description
commitInterval
int
The commit interval used by the JDBC driver. This is the number of operations to execute between commit points. Generally, a larger commit interval provides better performance.
connectionFactory
The connection factory to use for acquiring connections. A default implementation is provided. A specialized factory can be provided as needed.
driverName
String
The class name of the JDBC driver to use, for example: "sun.jdbc.odbc.JdbcOdbcDriver".
errorAction
The action to take if a SQL error occurs while reading data from the database.
finalizeTableSQL
String
The SQL statement to execute after processing all the records. For example, this could be used to execute a SQL "CREATE INDEX" statement.
initializeTableSQL
String
The SQL statement to execute before processing any records.
isolationLevel
int
The isolation level to use for the transactions. Reference the JavaDoc for the java.sql.Connection.setTransactionIsolation method for isolation level explanations.
keyNames
String[]
The field names (JDBC column names) to use as keys.
password
String
The password for connecting to the database.
renameMapping
Map<String,String>
A map used to convert flow names to table column names. This should be an ordered (that is, LinkedHashMap) mapping of names. The keys in the map represent the original names in the record port. The values in the map represent column names in the table. Any fields not included in the mapping will be dropped.
statusField
String
Sets the name of the field that will contain the updates statuses. Default: "DeleteStatus".
sqlWarningLimit
int
The maximum number of warnings regarding SQL errors that an operator can emit. This property is only used if the errorAction property is set to ErrorAction.WARN.
tableName
String
The name of the database table to write.
url
String
The URL specifying the instance of the database to connect with. See the documentation for the database for formatting information.
user
String
The user name for connecting to the database.
Ports
The DeleteFromJDBC operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
Data identifying the records to delete from the target table.
The DeleteFromJDBC operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
The input data used and the returned status codes.
Using the UpdateInJDBC Operator to Write Database Updates
The UpdateInJDBC operator updates rows in a target table using records from an input port to determine which columns to update. When updating rows the key fields must be specified in the operator properties. The key field values will be used within the generated database update statement to determine which rows qualify for an update. Additionally the update fields may be specified in the operator properties. If the fields to update are not set all non-key fields will be used for the update.
This operator provides an output port that will include the data used by the update operation and an additional column containing the status codes returned be the database for each update statement.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a .jar file that contains the JDBC driver and its dependencies.
Note that the UpdateInJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
Within Java, create an OperatorSettings instance with the maximum parallelism level set to the desired level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added.
Within RushScript, set the maxParallelism level property within the property settings of the operator instance.
Code Examples
The following example shows how to use the input data of the UpdateInJDBC operator to update database rows. The "r_user" and "r_movieid" fields from the ratings data are used as keys. The rating value is updated. This example also demonstrates collecting the output so the status codes can be recorded.
Using the UpdateInJDBC operator to update database rows
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.jdbc.OutputMode;
import com.pervasive.datarush.operators.io.jdbc.UpdateInJDBC;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.record.SelectFields;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;

public class UpdateMySQL {
    public static void main(String[] args) {
        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("WriteMySQL");
        
        // Create a delimited text reader for the "ratings.txt" file
        ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
        reader.setFieldSeparator("::");
        reader.setHeader(true);
        RecordTokenType ratingsType = record(INT("r_user"), INT("r_movieid"), INT("r_rating"), STRING("r_timestamp"));
        reader.setSchema(TextRecord.convert(ratingsType));
        
        // Create JDBC writer
        UpdateInJDBC writer = graph.add(new UpdateInJDBC());
        writer.setDriverName("com.mysql.jdbc.Driver");
        writer.setUrl("jdbc:mysql://dbserver:3306/test");
        writer.setUser("root");
        writer.setPassword("localadmin");
        writer.setTableName("ratings");
        
        // Specify the key fields names used as keys in the SQL update commands.
        writer.setKeyNames(new String[] {"r_user", "r_movieid"});
        writer.setSqlWarningLimit(20);
      
      // Specify the name of the field that should be updated
      writer.setUpdateFields(new String[] {"r_rating"});

      // Create a delimited text writer to collect the status codes
      WriteDelimitedText logger = app.add(new WriteDelimitedText("ratings_update_status.txt", WriteMode.OVERWRITE));
        logger.setWriteSingleSink(true);
        logger.setHeader(true);
      
      // Connect operators
        graph.connect(reader.getOutput(), writer.getInput());
      graph.connect(writer.getOutput(), logger.getInput());

      // Compile and run the graph
        graph.run();
    }
}
Using the UpdateInJDBC Operator in RushScript to update data in a table
//  Create the update in JDBC operator and set properties
dr.updateInJDBC(data, {
    driverName:'com.mysql.jdbc.Driver', 
    url:'jdbc:mysql://dbserver:3306/test', 
    user:'root', 
    password:'localadmin', 
    tableName:'ratings', 
    keyNames:["r_user", "r_movieid"], 
    updateFields:["r_rating"], 
    sqlWarningLimit:20},
    maxParallelism:8);    // optionally set the max number of parallel instances wanted
Properties
The UpdateInJDBC operator supports the following properties.
Name
Type
Description
commitInterval
int
The commit interval used by the JDBC driver. This is the number of operations to execute between commit points. Generally, a larger commit interval provides better performance.
connectionFactory
The connection factory to use for acquiring connections. A default implementation is provided. A specialized factory can be provided as needed.
driverName
String
The class name of the JDBC driver to use, for example: "sun.jdbc.odbc.JdbcOdbcDriver".
errorAction
The action to take if a SQL error occurs while reading data from the database.
finalizeTableSQL
String
The SQL statement to execute after processing all the records. For example, this could be used to execute a SQL CREATE INDEX statement.
initializeTableSQL
String
The SQL statement to execute before processing any records.
isolationLevel
int
The isolation level to use for the transactions. See the JavaDoc for the java.sql.Connection.setTransactionIsolation method for isolation level explanations.
keyNames
String[]
The field names (JDBC column names) to use as keys.
password
String
The password for connecting to the database.
renameMapping
Map<String,String>
A map used to convert flow names to table column names. This should be an ordered (that is, LinkedHashMap) mapping of names. The keys in the map represent the original names in the record port. The values in the map represent column names in the table. Any fields not included in the mapping will be dropped.
statusField
String
Sets the name of the field that will contain the updates statuses. Default: "UpdateStatus".
sqlWarningLimit
int
The maximum number of warnings regarding SQL errors that an operator can emit. This property is only used if the errorAction property is set to ErrorAction.WARN.
tableName
String
The name of the database table to write.
updateFields
String[]
The field names (JDBC column names) to use as update fields.
url
String
The URL specifying the instance of the database to connect with. See the documentation for the database for formatting information.
user
String
The user name for connecting to the database.
Ports
The UpdateInJDBC operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
Data identifying the the updates to apply to the target table.
The UpdateInJDBC operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
The input data used and the returned status codes.
Using the WriteDelimitedText Operator to Write Delimited Text
The WriteDelimitedText operator writes a stream of records as delimited text. The writer performs no reordering or filtering of the fields. If this is desired, the stream should be preprocessed with the appropriate operator.
Delimited text supports up to three distinct user-defined sequences within a record, used to identify field boundaries:
A field separator: found between individual fields. By default this is a comma character.
A field start delimiter: marking the beginning of a field value. By default this is the double quote character.
A field end delimiter: marking the end of a field value. By default this is the double quote character.
The field separator cannot be empty. The start and end delimiters can be the same value. They can also both (but not individually) be empty, signifying the absence of field delimiters. The writer will always delimit fields, regardless of whether the values require it. Should a delimited field need to contain the end delimiter, it is escaped from its normal interpretation by duplicating it. For instance, the value ab""c will be formatted as "ab"c".
The writer accepts a RecordTextSchema to provide formatting information for fields, as well as header row information. It is not required to provide a schema; however, one can be generated from the input data type using default formatting based on the data type. The writer also supports a pluggable discovery mechanism for creating a schema based on the input type, should fine-grained dynamic control be required.
Any schema, supplied or discovered, must be compatible with the input to the reader. To be compatible, a schema must contain a field definition with an assignable type for each field named in the input. Fields present in the schema but not the input are permitted with the missing field assuming a null value.
Delimited text data may or may not have a header row. The header row is delimited as usual but contains the names of the fields in the data portion of the record. The writer will emit a header row if it should be present and the write is not appending to an existing file.
When writing delimited text data, the metadata containing information about the ordering and partitioning of the data may optionally be saved with the data. This can provide a performance improvement when reading the data later since the data may not need to be re-sorted or partitioned again to ensure it meets various requirements of the graph.
As with all file-based write operators, the WriteDelimitedText operator will treat its sink as a directory by default. A file-per-application data stream will be output in this case. This can be overridden using the writeSingleSink property.
See Using the ReadDelimitedText Operator to Read Delimited Text to read data in delimited text format.
Code Examples
The following code example demonstrates using the WriteDelimitedText operator to write a set of data files.
Using the WriteDelimitedText operator in Java
// Construct the writer with the path where the results are to be written.
// Use OVERWRITE mode indicating the output file should be overwritten if it exists already.
WriteDelimitedText writer = graph.add(
              new WriteDelimitedText("results/test-ratings", WriteMode.OVERWRITE));
// No start/end field delimiter wanted
writer.setFieldDelimiter("");
// Output a header row at the beginning of the file with field names
writer.setHeader(true);

// Connect the output of the reader to the input of the writer
graph.connect(operator.getOutput(), writer.getInput());
Using the WriteDelimitedText operator in RushScript
// Note that this operator doesn't return anything
dr.writeDelimitedText(data, {target:'results/test-ratings', mode:'OVERWRITE', fieldDelimiter:'', header:true});
Note that the writeSingleSink property was not set. The property defaults to false. Running this application will result in multiple files being written into the target directory. The files will be named "part%n" where %n is replaced with the partition number. The number of files written will correspond to the parallelism configured for the DataFlow engine.
The partial screen shot below shows the results directory that contains the output of the sample application. Eight part files were written since the engine parallelism level was set to eight. Writing files in this manner is optimal, as the individual data streams can be written in parallel.
/download/attachments/20480516/eightfiles.jpg?version=1&modificationDate=1405715119624&api=v2
If for some reason the data is needed in a single file, enable the writeSingleSink property. The following code fragment amends the example above to add the setting of this property. The output will be a single file containing all of the resultant data. Performance will be adversely affected as the data must be gathered into a single stream for writing.
Using a single sink
// Create a delimited text writer
WriteDelimitedText writer = graph.add(new WriteDelimitedText("results/test-ratings.txt", WriteMode.OVERWRITE));
writer.setFieldDelimiter("");
writer.setHeader(true);
writer.setWriteSingleSink(true);
Properties
The WriteDelimitedText operator supports the following properties.
Name
Type
Description
charset
Charset
The character set used by the data source. Default: ISO-8859-1.
charsetName
String
The character set used by the data source by name.
encodeBuffer
int
The size of the buffer, in bytes, used to encode data. By default, this will be automatically derived using the character set and read buffer size.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the output data into the configured character set. The default action is to replace the faulty data with a replacement character.
fieldDelimiter
String
Delimiter used to denote the boundaries of a data field.
fieldEndDelimiter
String
Ending delimiter used to denote the boundaries of a data field.
fieldSeparator
String
Delimiter used to define the boundary between data fields.
fieldStartDelimiter
String
Starting delimiter used to denote the boundaries of a data field.
formatOptions
The formatting options used by the reader. This sets all format options at once.
header
String
Whether to expect a header row in the source. The header row contains field names.
lineComment
String
The character sequence indicating a line comment. Lines beginning with this sequence are ignored.
mode
Determines how the writer should handle an existing target.
nullIndicator
String
Text value to substitute for null data values. Empty string by default.
recordSeparator
String
Value to use as a record separator.
replacement
String
Replacement string to use when encoding error policy is replacement. Default: '?'
saveMetadata
boolean
Whether the writer should save metadata information about the ordering and distribution of the data. Default: false.
schema
The record schema expected in the delimited text source.
target
ByteSink, Path, or String
The sink to write. Path and String parameters are converted to sinks. If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.
writeBuffer
int
The size of the I/O buffer, in bytes, to use for writes. Default: 64K.
writeSingleSink
boolean
Whether the writer should produce a single output file or multiple ones. By default, an output file will be produced for each partition, if the target sink supports this.
writeOnClient
boolean
Determines whether writes are performed by the local client or in the cluster. This option also disables the parallelism during the write, producing a merge of the data on the local client. By default, writes are performed in the cluster, if executed in a distributed context.
Ports
The WriteDelimitedText operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to the provided sink in delimited text format.
Using the WriteFixedText Operator to Write Fixed-width Text
The WriteFixedText operator writes a record dataflow as a text file of fixed-width records. The writer requires a schema (FixedWidthTextRecord) to determine how field values are positioned and formatted. Fields are mapped by name; all fields defined in the schema must be present in the input (and vice versa), although they need not be in the same order. The order of fields in the formatted record is determined by the order in the supplied schema. Records in the file are separated by a non-empty, user-defined character sequence.
The record schema can be manually constructed using the API provided, though this metadata is often persisted externally. Support for Pervasive Data Integrator structured schema descriptors (.schema files) is provided by the StructuredSchemaReader class.
See Using the ReadFixedText Operator to Read Fixed-width Text to read fixed-width text data.
Code Examples
The following example uses the WriteFixedText operator to write a file in fixed-width format.
Using the WriteFixedText operator in Java
// Define the schema for the account data. This schema can be used
// for reading and writing the account data.
FixedWidthTextRecord schema = new FixedWidthTextRecord(new TextConversionDefaults(StringConversion.NULLABLE_TRIMMED));
schema.defineField("accountNumber", new PaddedTextType(TextTypes.STRING, 9, ' ', Alignment.LEFT));
schema.defineField("name", new PaddedTextType(TextTypes.STRING, 21, ' ', Alignment.LEFT));
schema.defineField("companyName", new PaddedTextType(TextTypes.STRING, 31, ' ', Alignment.LEFT));
schema.defineField("address", new PaddedTextType(TextTypes.STRING, 35, ' ', Alignment.LEFT));
schema.defineField("city", new PaddedTextType(TextTypes.STRING, 16, ' ', Alignment.LEFT));
schema.defineField("state", new PaddedTextType(TextTypes.STRING, 2, ' ', Alignment.LEFT));
schema.defineField("zip", new PaddedTextType(TextTypes.STRING, 10, ' ', Alignment.LEFT));
schema.defineField("emailAddress", new PaddedTextType(TextTypes.STRING, 25, ' ', Alignment.LEFT));
schema.defineField("birthDate", new PaddedTextType(TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy")), 10, ' ', Alignment.LEFT));
schema.defineField("accountCodes", new PaddedTextType(TextTypes.STRING, 11, ' ', Alignment.LEFT));
schema.defineField("standardPayment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
schema.defineField("payment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 7, ' ', Alignment.LEFT));
schema.defineField("balance", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));

// Create fixed text writer
WriteFixedText writer = graph.add(new WriteFixedText("results/accounts-fixedwidth.txt", WriteMode.OVERWRITE));
writer.setSchema(schema);
writer.setWriteSingleSink(true);
Here is a fragment of the data output by the example WriteFixedText operator.
01-000667George P Schell      Market Place Products          334 Hilltop Dr                     Mentor          OH44060-1930warmst864@aol.com        02/28/1971XA         101.0 100.0  15.89 
01-002423Marc S Brittan       Madson & Huth Communication Co 5653 S Blackstone Avenue, #3E      Chicago         IL60637-4596mapper@tcent.net         06/30/1975BA         144.0 144.0  449.92
01-006063Stephanie A Jernigan La Salle Clinic                77565 Lorain                       Akron           OH44325-4002dram@akron.net           11/02/1941EB|CB      126.0 126.0  262.98
01-010474Ernie Esser          Town & Country Electric Inc.   56 Pricewater                      Waltham         MA2453      hazel@bentley.net        12/15/1962JA|RB      127.0 127.0  271.75
01-010852Robert A Jacoby      Saturn of Baton Rouge          4001 Lafayette                     Baton Rouge     LA70803-4918din33@norl.com           12/22/1985ED|EA|RB|KA142.0 150.0  423.01
01-011625James C Felli        Bemiss Corp.                   23A Carolina Park Circle           Spartanburg     SC29303-9398cadair@gw.com            02/21/1940SB         151.0 155.0  515.41
01-018448Alan W Neebe         Georgia State Credit Union     PO Box 159                         Demorest        GA30535-1177delores@truett.com       01/31/1960MA|ED|SB   113.0 120.0  131.89
01-018595Alexander Gose       Office Support Services        436 Green Mountain Circle          New Paltz       NY12561-0023dams@matrix.net          06/19/1940EC         147.0 147.0  477.09
The following example demonstrates writing fixed-width data using RushScript.
Using the WriteFixedText operator in RushScript
var accountsFixedSchema = dr.schema({type:'FIXED'})
    .nullable(true)
    .trimmed(true)
    .padChar(' ')
    .alignment('LEFT')
    .STRING("accountNumber", {size:9})
    .STRING("clientName", {size:21})
    .STRING("companyName", {size:31})
    .STRING("streetAddress", {size:35})
    .STRING("city", {size:16})
    .STRING("state", {size:2})
    .STRING("zip", {size:10})
    .STRING("emailAddress", {size:25})
    .DATE("birthDate", {pattern:'MM/dd/yyyy', size:10})
    .STRING("accountCodes", {size:11})
    .DOUBLE("standardPayment", {pattern:'0.00', size:6})
    .DOUBLE("payment", {pattern:'0.00', size:7})
    .DOUBLE("balance", {pattern:'0.00', size:6});

// Read fixed-width data data
dr.writeFixedText(
    data, 
    {
        target:'/path/to/file.txt', 
        mode:'OVERWRITE', 
        schema:accountsFixedSchema
    });
Properties
The WriteFixedText operator supports the following properties.
Name
Type
Description
charset
Charset
The character set used by the data source. Default: ISO-8859-1.
charsetName
String
The character set used by the data source by name.
encodeBuffer
int
The size of the buffer, in bytes, used to encode data. By default, this will be automatically derived using the character set and read buffer size.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the output data into the configured character set. The default action is to replace the faulty data with a replacement character.
formatOptions
The formatting options used by the reader. This sets all format options at once.
mode
Determines how the writer should handle an existing target.
nullIndicator
String
Text value to substitute for null data values. Empty string by default.
recordSeparator
String
Value to use as a record separator.
replacement
String
Replacement string to use when encoding error policy is replacement. Defaul: '?'
schema
The record schema expected in the delimited text source.
target
ByteSink, Path, or String
The sink to write. Path and String parameters are converted to sinks. If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.
writeBuffer
int
The size of the I/O buffer, in bytes, to use for writes. Default: 64K.
writeSingleSink
boolean
Whether the writer should produce a single output file or multiple ones. By default, an output file will be produced for each partition if the target sink supports this.
writeOnClient
boolean
Determines whether writes are performed by the local client or in the cluster. This option also disables the parallelism during the write, producing a merge of the data on the local client. By default, writes are performed in the cluster if executed in a distributed context.
Ports
The WriteFixedText operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to the provided sink in fixed-width text format.
Using the WriteSink Operator to Write Generic Targets
The WriteSink operator writes a stream of records to a data sink. The sink accepts a sequence of bytes that represent formatted records, identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor, which must be provided.
This operator is low level, providing a generalized model for reading files in a distributed fashion. Typically, WriteSink is not directly used in a graph. Instead, it is indirectly used through a composite operator such as one derived from AbstractWriter, providing a more appropriate interface to the end user.
Parallelized writes are supported by creating multiple output sinks from the original target, called fragments, each representing the portion of data on one partition. In the case of a write to a file system, this would result in a directory of files. To write in parallel the target must support the concept of fragmenting. If a target does not support fragmenting the write is forced to be non-parallel.
A port is provided that signals completion of the writer. This can be used to express a dependency between operators based on the target having been successfully written.
The writer makes a best-effort attempt to validate the target before execution, but cannot always guarantee correctness depending on the nature of the target. This is done to try to prevent failures that may result in the loss of a significant amount of work, such as when misconfigured graphs execute in a late phase.
Tip:  This is a low-level operator that typically is not directly used. It can be used with a custom data format. A custom data format may be needed to support a format not provided by the DataFlow library.
Code Example
This example code fragment demonstrates how to set up a writer for a generic file type.
Using the WriteSink operator
WriteSink writer = new WriteSink();
writer.setTarget(new BasicByteSink("filesink"));
writer.setMode(WriteMode.OVERWRITE);
writer.setFormat(new DelimitedTextFormat(TextRecord.convert(record(INT("intfield"),STRING("stringfield"))),
                 new FieldDelimiterSettings(), 
                 new CharsetEncoding()));
Properties
The WriteSink operator supports the following properties.
Name
Type
Description
format
The data format for the configured source.
formatOptions
The formatting options used by the reader. This sets all format options at once.
mode
Determines how the writer should handle an existing target.
target
ByteSink, Path, or String
The sink to write. Path and String parameters are converted to sinks. If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.
writeSingleSink
boolean
Whether the writer should produce a single output file or multiple ones. By default, an output file will be produced for each partition if the target sink supports this.
Ports
The WriteSink operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Data to be formatted and written to the configured sink.
Using the WriteStagingDataset Operator to Write Staging Data Sets
The WriteStagingDataset operator writes a sequence of records to disk in an internal format for staged data. Staged data sets are useful as they are more efficient than text files, being stored in a compact binary format. If a set of data must be read multiple times, significant savings can be achieved by converting it into a staging data set first.
It is generally best to perform parallel writes, creating multiple files. This allows reads to be parallelized effectively, as the staging format is not splittable.
There are several use cases for directly using staging data sets. A few examples are:
A particular task to be accomplished must be broken into several DataFlow applications. Data from each application must be passed to the next. Staging data sets can be used to capture the intermediate data, allowing for quick and easy access.
Data within a database is being used to build a predictive model. This development will require many iterations of algorithm testing with the data.
For this use case, the most efficient route is to first read the data from the source database and write it to staging data sets. Then use the staged data as input into the predictive analytic development. The one-time read from the database is overhead. However, given that the source data will be read many times, the savings will be substantial.
To read data in the staging data set format, see Using the ReadStagingDataset Operator to Read Staging Datasets.
Code Examples
The following code example reads from a delimited text file and writes the data to a staging data set.
Using the WriteStagingDataset operator in Java
// Write to a staging dataset overwriting existing content
WriteStagingDataset datasetWriter = graph.add(
     new WriteStagingDataset("results/ratings-stage", WriteMode.OVERWRITE));
Using the WriteStagingDataset operator in RushScript
// Note: no return value
dr.writeStagingDataset(data, {target:'results/ratings-stage', mode:'OVERWRITE'});
The following partial screen shot shows the results directory that contains the output of the sample application. Eight part files were written since the engine parallelism level was set to eight. Writing files in this manner is optimal, as the individual data streams can be written in parallel. This is recommended for staging data sets because the format is not splittable. Having a single file output limits the parallelism when reading.
/download/attachments/20480510/eightfiles.jpg?version=1&modificationDate=1405715119421&api=v2
Properties
The WriteStagingDataset operator supports the following properties.
Name
Type
Description
blockSize
int
Sets the block size, in rows, used for encoding data. Default: 64 rows. This setting is of most importance when using the COLUMNAR storage format.
format
DataStorageFormat
The data set format used to store data. By default, this is set to COMPACT_ROW.
formatOptions
The formatting options used by the reader. This sets all format options at once.
mode
Determines how the writer should handle an existing target.
selectedFields
List<String>
The list of input fields to include in the output. Use this to limit the fields written to the output.
target
ByteSource, Path, or String
Source of the input data to parse as delimited text.
target
ByteSink, Path, or String
The sink to write. Path and String parameters are converted to sinks. If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.
writeBuffer
int
The size of the I/O buffer, in bytes, to use for writes. Default: 64K.
writeSingleSink
boolean
Whether the writer should produce a single output file or multiple ones. By default, an output file will be produced for each partition if the target sink supports this.
writeOnClient
boolean
Determines whether writes are performed by the local client or in the cluster. This option also disables the parallelism during the write, producing a merge of the data on the local client. By default, writes are performed in the cluster if executed in a distributed context.
Ports
The WriteStagingDataset operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to the provided sink in staging data format.
Using the WriteARFF Operator to Write Sparse Data
The WriteARFF operator writes files using the Attribute-Relation File Format (ARFF). ARFF supports both sparse and dense formats. The format to use is specified with the format mode property. The default mode is sparse. When the input data is highly sparse, the sparse mode can save space since it only writes non-zero values. However, if the data is not very sparse, using sparse mode can actually cause the resultant file to be larger. Use dense mode for dense data.
Note that when writing sparse data that contains enumerated types, the zero ordinal category will always be treated as sparse and so will not appear in the output file. This is normal and expected behavior. The enumerated type is captured in the metadata. When the file is read, the zero ordinal values will be restored as expected.
ARFF provides a metadata section at the start of the file. The metadata may contain several items:
Comments: free-form, user-supplied comments.
Relation name: a user name given to the relation contained within the ARFF file.
Attribute definitions: defines the field name and type of each data field. The attribute definitions define the schema of the file. The definitions are generated by the operator.
ARFF is useful when the input data is sparse. Since ARFF includes metadata such as the file schema, it is also very useful and convenient to use. The dense format is a simple CSV format and as such is very similar to the format written by the WriteDelimitedText operator.
ARFF data can be read Using the ReadARFF Operator to Read Sparse Data.
Code Examples
The following example writes an ARFF file in sparse mode.
Using the WriteARFF operator in Java
// Write to an ARFF file in sparse mode
WriteARFF arffWriter = graph.add(new WriteARFF("results/ratings-arff.txt", WriteMode.OVERWRITE));
arffWriter.setFormatMode(ARFFMode.SPARSE);
arffWriter.setRelationName("Movie Ratings");
arffWriter.addComment("This is an ARFF file written using DataFlow");
arffWriter.setWriteSingleSink(true);
Notes from the example:
The ARFF mode is set to sparse. This enables writing the data in sparse format. Any numeric data values equal to zero will not be written to the output, thereby saving space. When the data is sparse, using the sparse format is very efficient.
The relation name is set. This is optional. The relation name is part of the ARFF metadata. It is used by consumers of ARFF files to distinguish the data set.
A comment is added. This is also optional. Comments are added at the beginning of the ARFF metadata section. When reading an ARFF file the comments can also be retrieved. There is no practical limit to the number of comment lines that may be added.
The writeSingleSink property is enabled. This option forces the writing of a single output file. The default is to write multiple files in parallel.
Using the WriteARFF operator in RushScript
dr.writeARFF(data, {
        target:'results/ratings-arff.txt', 
        mode:'OVERWRITE', 
        formatMode:'SPARSE', 
        relationName:'Movie Ratings', 
        comments:'This is an ARFF file written using DataFlow', 
        writeSingleSink:'true'});
The following is the first few lines of the ARFF file written with the code example. The metadata appears at the top of the file. Metadata statements begin with the "@" character. Comments begin with the "%" character. The data follows after the DATA statement. Data rows in sparse format are wrapped with curly braces. Data values are comma-separated. Each data value begins with the field index followed by the data value.
% This is an ARFF file written using DataFlow
@relation 'Movie Ratings'
@attribute userID integer % int32
@attribute movieID integer % int32
@attribute rating integer % int32
@attribute timestamp string
@data
{0 1,1 1193,2 5,3 978300760}
{0 1,1 661,2 3,3 978302109}
{0 1,1 914,2 3,3 978301968}
{0 1,1 3408,2 4,3 978300275}
{0 1,1 2355,2 5,3 978824291}
{0 1,1 1197,2 3,3 978302268}
{0 1,1 1287,2 5,3 978302039}
{0 1,1 2804,2 5,3 978300719}
{0 1,1 594,2 4,3 978302268}
Properties
The WriteARFF operator supports the following properties.
Name
Type
Description
charset
Charset
The character set used by the data source. By default ISO-8859-1 is used.
charsetName
String
The character set used by the data source by name.
comments
List<String>
List of comments to add to the target file.
encodeBuffer
int
The size of the buffer, in bytes, used to encode data. By default, this will be automatically derived using the character set and read buffer size.
encoding
Properties that control character set encoding.
errorAction
CodingErrorAction
The error action determines how to handle errors encoding the output data into the configured character set. The default action is to replace the faulty data with a replacement character.
fieldDelimiter
char
The character to use to delimit a field. The value is limited to either a single or double quote.
formatMode
The mode with which to write the file. The supported modes are SPARSE and DENSE.
recordSeparator
String
The string value to use as the record separator. This string will be output at the end of each record written. The default value is the system dependent record separator. The value is limited to the Windows or UNIX record separators.
relationName
String
The relation name attribute. In ARFF, the relation name is captured in the metadata with the tag "@attribute".
replacement
String
Replacement string to use when encoding error policy is replacement. Default: '?'
mode
Determines how the writer should handle an existing target.
target
ByteSink, Path, or String
The sink to write. Path and String parameters are converted to sinks. If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.
writeBuffer
int
The size of the I/O buffer, in bytes, to use for writes. Default: 64K.
writeSingleSink
boolean
Whether the writer should produce a single output file or multiple ones. By default, an output file will be produced for each partition if the target sink supports this.
writeOnClient
boolean
Determines whether writes are performed by the local client or in the cluster. This option also disables the parallelism during the write, producing a merge of the data on the local client. By default, writes are performed in the cluster if executed in a distributed context.
Ports
The WriteARFF operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to the provided sink in ARFF.
Using the ForceRecordStaging Operator to Explicitly Stage Data
Using the ForceRecordStaging operator forces the operators on the input and output sides of the operator to execute sequentially, instead of concurrently. In a dataflow graph, consumers and producers run simultaneously, exchanging data as the operators are ready. This permits the graph to exploit pipeline parallelism. The data is staged to disk in configured temporary storage spaces.
In some cases, it may be necessary to avoid this concurrency. The simplest example is:
Operator A produces data.
Operator B consumes data from A, producing data only when all data from A has been consumed. (Calculating a total, for instance.)
Operator C consumes the output of both A and B, but does not consume any data from A until after consuming data from B.
In this case, a memory usage dilemma, known as queue expansion, arises. B needs all of A's data, but C does not read anything from A until B produces some data. As the data unread by C cannot be discarded, it must be saved until it is read. To minimize memory requirements, the lag between readers is limited. This limit is raised when required, but in this case, it requires the entire output of A, which could easily exceed memory.
Usually, this is automatically handled by the framework, saving data to disk. This is done in a optimistic fashion, avoiding disk I/O and associated performance degradation until necessary at the cost of a (small) potential risk of graph failure due to memory being exhausted. This operator provides a pessimistic approach to the problem that forces all data to disk, with all the associated performance costs.
Tip:  Use the ForceRecordStaging operator directly in situations where it is known that queue expansion will occur. Direct usage prevents the overhead of handling queue expansion.
Code Example
The code example provides a common use case requiring use of the ForceRecordStaging operator. A data source is aggregated using the Group operator. Aggregation causes a reduction of the input data. The output of Group is then wired into a Join operator. This allows joining the aggregated results back to the original data. However, only a few key groups are known to exist. This implies the join will consume large amounts of the original source while waiting for data from the group. Forcing the data to be staged prevents queue expansion and the overhead of handling the expansion at run time.
Note that the example is not complete: code to produce the source is not provided, and properties are not set on the group and join operators.
Using the ForceRecordStaging operator
// Aggregate the source data
Group group = graph.add(new Group());
graph.connect(source, group.getInput());

// Force staging of source data to prevent queue expansion.
// The group operator reduces the input row count considerably.
ForceRecordStaging forceStaging = graph.add(new ForceRecordStaging());
graph.connect(source, forceStaging.getInput());

// Join the original data to the aggregated results. Use the staged data
// instead of the original source to prevent queue expansion.
Join join = graph.add(new Join());
graph.connect(forceStaging.getOutput(), group.getOutput());
Using the ForceRecordStaging operator in RushScript
// Force a dataset to be staged
var stagedData = dr.forceRecordStaging(data);
Properties
The ForceRecordStaging operator has no properties.
Ports
The ForceRecordStaging operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The input data.
The ForceRecordStaging operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Output data of the operator. The input data is replicated to this port after first staging to disk.
Using the WriteORC Operator to Write Apache ORC Files
The WriteORC operator writes data file in Apache Optimized Row Columnar (ORC) format. The ORC format is supported by Apache Hive.
The operator is registered as writeORC for using as JSON or scripting. This operator is based on AbstractWriter operator and supports the features from the base class, particularly in parallel writing. Each par file created in this way is a logical ORC file. Similar to ReadORC operator, the WriteORC operator can access Hive libraries using the shim layer. The operator uses the ORCFile or Writer API.
Note:  The operator requires a Hadoop module configuration to be enabled even if the workflow does not run on the cluster or access HDFS.
For more information about ORC format, go to the ORC file format manual.
A few DataFlow data types such as CHAR and TIME are not supported for writing because these do not have a matching ORC type. If a schema containing the unsupported data types is found, then the WriteORC operator will throw an exception at the time of compiling a graph.
In general, DataFlow types are assigned to the ORC schema types as mentioned in the following table.
DataFlow Type
ORC Schema Type
BOOLEAN
BOOLEAN
LONG
BIGINT
BINARY
BINARY
DATE
DATE
NUMERIC
DECIMAL
DOUBLE
DOUBLE
FLOAT
FLOAT
INT
INT
STRING
STRING
TIMESTAMP
TIMESTAMP
Properties
The WriteORC operator provides the following properties that is derived directly from WriterOptions of the ORC.
Properties
Type
Description
blockPadding
boolean
Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks.
bufferSize
int
The size of the memory buffers used for compressing and storing the stripe in memory.
compression
ORCCompression
(mapping to CompressionKind)
Sets the generic compression that is used to compress the data.
Lempel–Ziv–Oberhumer (LZO) data compression is not supported. This is excluded from the ORCCompression enum.
rowIndexStride
int
Sets the distance between entries in the row index. This value must be 0 (no index) or greater than or equal to 1000.
stripeSize
long
Sets the stripe size for the file.
version
ORCVersion
(mapping to Version)
Sets the version of the file that will be written.
formatOptions
Formatting options used by the reader. This property sets all the format options at once.
mode
Defines the way for the writer to handle an existing target.
Appending to an existing file is not supported. If you specify APPEND option as the mode property value, then it throws an exception while compiling.
saveMetadata
boolean
Specifies whether or not the writer should save metadata information about the ordering and distribution of data. Default value is false.
target
ByteSink, Path, or String
Defines the sink to write. The Path and String parameters are converted to sinks.
If the writer is parallelized, then this is interpreted as a directory in which each partition writes a fragment of the entire input stream. Else, it is interpreted as the file to write.
writeBuffer
int
Specifies the size of I/O buffer (in bytes) to use for write operation. Default: 64000.
writeSingleSink
boolean
Specifies whether the writer should produce a single or multiple output files. By default, an output file is created for each partition only if the target sink supports.
writeOnClient
boolean
Defines whether or not the write operation is performed by the local client or in the cluster. This option also disables the parallelism during the write and merges the data on the local client. By default, the write operation is performed in the cluster if executed in a distributed context.
Note:  Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see Integrating DataFlow with Hadoop.
Code Examples
The following example provides writing the ORC file data using Java.
Using the WriteORC operator in Java
WriteORC writer = graph.add(new WriteORC("hdfs://localhost:9000/dforc.orc"));
writer.setMode(WriteMode.OVERWRITE);
writer.setCompression(ORCCompression.SNAPPY);
writer.setBlockPadding(true);
The following example provides writing the ORC file data using RushScript.
Using the WriteORC operator in RushScript
var writer = dr.writeORC("orc-out", source, {target:"file:///data/dforc.orc", writeSingleSink:true});
Port
The WriteORC operator provides a single input port.
Name
Type
Get Method
Description
Input
getInput()
Input data that should be written to the provided sink in delimited text format.
Database Loaders
Database loaders provide the ability to transfer data into a database quickly and efficiently. They bulk-load data using APIs or utilities specific to the database implementation. Unlike JDBC, they are not generic. But they are able to provide a performance boost over JDBC access.
DataFlow provides the following database loaders:
Using the LoadActianVector Operator to Load Actian Vector
Using the LoadMatrix Operator to Load Actian Matrix
Using the MatrixSink Operator to Load Actian Matrix from SQL
Using the LoadActianVector Operator to Load Actian Vector
The LoadActianVector operator is used to bulk-load data into an Actian Vector database. The operator provides a single input port containing the data to load into Vector. There are several methods for loading the data that are supported:
Direct: Data is streamed directly into the Vector engine. This method is generally faster than the other methods. It is also more efficient because data is streamed directly into the Vector engine without having to stage it first. This method also supports execution within a cluster environment. As such, data can be efficiently moved from HDFS into Vector. Currently, direct loading is only supported on Linux and Windows 64-bit platforms.
vwload: The vwload utility provides the generally fastest method to bulk load data into Vector. The utility can only be run on the Vector server machine. The user running vwload must have DBA privileges.
SQL COPY: This command can be executed on machines remote to the Vector server. The Vector client software must be installed and configured to use SQL COPY. The user does not have to have DBA privileges to run SQL COPY. SQL COPY does require copy permission on the target table.
COPY VWLOAD: This command can be executed on machines remote to the Vector server. The command allows the vwload utility to be executed using a SQL command. This command is useful if the vwload utility is not available on the path of the target instance. COPY VWLOAD does require copy permission on the target table.
Direct Load
When using the direct load capability, the data is streamed from the input port directly into the Vector engine. Direct loading can run in parallel and supports execution within a cluster environment such as Hadoop. Direct loading can be used to copy data from HDFS into a Vector instance. When run within a Hadoop cluster, the reading of the data, formatting and sending to Vector operations are run distributed, taking full advantage of the Hadoop resources.
When loading to Vector database up to version 4.2, direct loader sorts the data on clustered index before loading. Direct load method leverages parallel execution capabilities in DataFlow to sort the data before loading to Vector database up to version 4.2. When unique keys are defined and they are the same as sort keys, direct loader eliminates the duplicates before loading. However, this duplicate key elimination is limited to data being loaded, not to the data existing in the table already. As a result, incremental loads are not supported when sort keys or unique keys are defined on a table.
This loader has certain limitations:
When both Sort Keys and Unique Keys are defined, both should be same, else the loading will fail.
When Sort Key or Unique Key are specified, incremental loads are not supported. This means the target table should be empty, else the loading will fail.
A DataFlow workflow can load to only one version of Vector at a time. When executing workflows that load to different versions of Vector, the KNIME UI should be restarted.
To load data using the direct method, the user should have MONITOR privilege for Vector version 3.0 and DB Admin privilege for Vector version 3.5.
There may be loss of precision when converting double to decimal value. Converting from String to Decimal preserves the precision.
When loading Time values with time zones the timestamp type should be used to guarantee the data is loaded correctly. When the time zone is unspecified such as when using Time types, the local time zone will be used.
vwload, COPY VWLOAD, and SQL COPY
When using either the vwload, COPY VWLOAD, or SQL COPY methods for loading, the LoadActianVector operator reads its input data and stages the data into temporary files in the correct format for loading. This data staging is done in parallel according to the current parallelism set for the application, or specifically for the operator. After the data is staged into the load format, the load procedure is invoked (vwload, COPY VWLOAD, or SQL COPY). The staged files are deleted after the load completes, whether successfully or not.
The source fields can be mapped to the target table columns using a provided field map (see the renameMapping property). If a mapping is not provided, the source data fields will be mapped by position to the target table columns. The mapping happens in schema definition order of the target table. The source field names do not have to match the target columns names.
For example, given a table with three columns:
orderkey (varchar)
quantity (integer)
price (double)
Loading data into this table requires three input fields that match the given types (string, integer, and double). If a field mapping is not provided, the input data will be mapped by position to the target columns. So the first input field will be mapped to column orderkey, the second input field to column quantity, and the third input field to column price. If the input data is not ordered correctly, then the load will fail. Providing a field map specifies which input fields map to which database columns. See the code examples for the details of providing a field map.
Enable rollback to abort a data load after the specified number of errors occur. The errors in question are ones caused by failing to parse records from the staged load files. When a data load is aborted and rollback is enabled, no data will be written to the target table. The maximum errors tolerated defaults to zero. Other critical system errors may abort a load directly without consideration of the max errors property.
Enable the jdbcOnly option to limit the data load to using JDBC exclusively instead of opening additional socket connections to the instance. Currently, this option only applies to COPY VWLOAD. This option can be used if additional ports other than the one used for the JDBC connection cannot be created due to security or firewall restrictions.
To capture the input records that are rejected by vwload, COPY VWLOAD, or SQL COPY load process, set the rejectsPath property. The rejected records will be written to a file at this path. If no rejects are generated, the rejects file will not be created.
The output of vwload, COPY VWLOAD, or the SQL command will be captured and written to the application log at the INFO log level. View this information for details about the load process.
Null Values
When inserting null values with either direct load or vwload, the null settings of the table will be enforced by the loader. This means that an error will be thrown when trying to load a column that is not nullable with a null.
Time and Timestamp Precision
When loading Time and Timestamp columns using vwload, COPY VWLOAD, or SQL COPY the maximum supported precision is milliseconds. If greater precision is required, direct loading should be used, which supports nanosecond precision.
Code Examples
The following code example demonstrates using the LoadActianVector operator using the Java API. The direct method is used. The user and password provided are required to be the DBA account.
Using the LoadActianVector operator in Java (directload)
// Build a map of source field names to target table column names.
// Only these fields will loaded. Other source fields are dropped.
// Target columns not mapped will be null filled.
Map<String, String> fieldMap = new HashMap<String, String>();
fieldMap.put("orderkey", "l_orderkey");
fieldMap.put("partkey", "l_partkey");
fieldMap.put("suppkey", "l_suppkey");
fieldMap.put("linenumber", "l_linenumber");
fieldMap.put("quantity", "l_quantity");
fieldMap.put("extendedprice", "l_extendedprice");

// Create the operator within an application and set the properties.
// Use direct for this load. No need to set maxErrors, rollback is disabled.
LoadActianVector loader = app.add(new LoadActianVector());
loader.setHost("database-server");
loader.setInstance("VW");
loader.setDatabaseName("tpch");
loader.setTableName("lineitem");
loader.setMethod(LoadMethod.DIRECT);
loader.setUser("user");
loader.setPassword("password");
loader.setRenameMapping(fieldMap);
loader.setRollback(false);
The following example demonstrates using the operator in RushScript. Note that because the vwload utility is used, the application must be run on the Vector server.
Using the LoadActianVector Operator in RushScript (vwload)
var fieldMap = {
    'orderkey':'l_orderkey',
    'partkey':'l_partkey',
    'suppkey':'l_suppkey',
    'linenumber':'l_linenumber',
    'quantity':'l_quantity',
    'extendedprice':'l_extendedprice'
};
 
// Load data into Vector using vwload.
dr.loadActianVector(
    data, 
    {
        host:'<IP address or FQDN of the cluster>',
        instance:'VW',
        databaseName:'tpch', 
        tableName:'lineitem', 
        rejectsPath:'rejects.txt', 
        renameMapping:fieldMap, 
        rollback:true, 
        maxErrors:10, 
        method:'VWLOAD', 
        user:'user', 
        password:'password'
 -  });
Properties
The LoadActianVector operator supports the following properties.
Name
Type
Description
charset
String
Determines the character set that is used during staging with VWLOAD method. For the vwload supported character sets, see the Actian Vector documentation.
cleanData
boolean
If this is enabled, additional operations and checks may be performed on the data before loading it into Vector to ensure it meets any table constraints or other requirements. Additionally, invalid values will be loaded as nulls if the table allows instead of producing errors, such as when stringTruncationError or decimalTruncationError are enabled. Default: false.
database
String
The name of the target database.
decimalTruncationError
boolean
Whether an error will be thrown if decimal truncation would occur during database loading. Default: false.
extraProperties
Map(String, String)
A map of key value properties that will be used when creating the connection to the Vector database.
finalizeTableSQL
String
The SQL statement to execute after processing all the records. For example, this could be used to execute a SQL CREATE INDEX statement.
host
String
Name of the Vector host.
initializeTableSQL
String
The SQL statement to execute before processing any records.
insertMode
String
The insert mode (ROW, BULK, or DEFAULT) that applies for Insert and Merge operations when Direct loading. Default: DEFAULT.
instance
String
Name of the Vector instance. Default: "VW"
jdbcOnly
boolean
When jdbcOnly is enabled, the load attempts to use the JDBC connection exclusively instead of remotely executing the command. Default: false.
maxErrors
int
The maximum errors tolerated before aborting and rolling back a data load. Only applicable when rollback is enabled. Default: 0
method
LoadMethod
The load method to use for loading the data. There are currently three choices with the following constraints:
DIRECT: Streams the data directly into the Vector engine. Only a privileged user can use the direct method. The user must also have access to write the target table.
VWLOAD: This utility is used to load the data. The vwload utility must be executed on the Vector server. It is generally a faster method to load data. Only the DBA user can execute vwload.
SQLCOPY: This command can be used remotely and is used to load the data. The Vector client software must be installed and configured for SQL COPY to work. The user does not require DBA permissions but must have copy access to write to the target table.
COPYVWLOAD: This command is used to load the data. It can also be used remotely to initiate execution of vwload through SQL.
password
String
The password for the user account that has write/copy permissions for the target table. This property is always needed for the SQL copy method. It is optionally required for use with vwload. When provided with vwload, it allows the DBA to impersonate the database user for the purpose of loading data into the target table.
port
int
The port used by the Vector instance. Default: 7.
rejectsPath
String
Path to the target file containing rejected records from the data load. If no records are rejected, the file will not be created.
renameMapping
Map<String, String>
A map of source field names to target table column names. If the input data schema matches the target table schema in number of fields, order, and type of the fields, then this property is not needed. If a map is provided source fields that are not mapped are dropped. Likewise, target columns that are not mapped will be null-filled. Non-nullable fields that are not mapped will result in a SQL error.
rollback
boolean
When rollback is enabled, the data load will be aborted and all data rolled back if the max error limit is crossed. If disabled, the data load will continue if errors occur. The rejected records will be written to the rejectsPath file.
sshPassword
String
Operating system password used to connect to NameNode of the Hadoop installation. This operator uses this password to establish an SSH connection to the machine running the master node of the Vector in Hadoop installation.
sshUser
String
Operating system user ID used to connect to NameNode of the Hadoop installation. This operator uses this user ID to establish an SSH connection to the machine running the master node of the Vector in Hadoop installation.
stringTruncationError
boolean
Whether an error will be thrown if string truncation would occur during database loading. Default: false.
table
String
The name of the table that is the target of the data load.
tmpDirectory
String
A temporary directory to be used for storing the intermediate loader files. If not set, it will attempt to use the local default temporary directory. A directory must be specified that exists in the HDFS file system on the target Hadoop cluster when loading Vector on Hadoop with vwload or the load will not use the distributed loader.
user
String
The database user account that has copy permissions to the target database table. This property is always needed for the SQL copy method. It is optionally required for use with vwload. When provided with vwload, it allows the DBA to impersonate the database user for the purpose of loading data into the target table.
vectorSize
int
The size in units of rows to use for buffering input data before constructing Vector blocks. The available native memory impacts the value of this parameter. For tables with wide columns (more than 1000), this parameter should be adjusted down accordingly to avoid OutofMemoryExceptions. Default: 1024. This property is only valid when the load method is set to DIRECT.
Ports
The LoadActianVector operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to load into a Vector table.
Using the LoadMatrix Operator to Load Actian Matrix
The LoadMatrix operator is used to bulk load data into a Matrix database. The operator provides a single input port containing the data to load into the database instance. Data is loaded directly into the database instance using the ODI capability of the Matrix database. The loading executes in parallel and supports distributed loading when executed on a cluster.
The source fields can be mapped to the target table columns using a provided field map (see the renameMapping property). If a mapping is not provided, the source data fields will be mapped by position to the target table columns. The mapping happens in schema definition order of the target table. The source field names do not have to match the target columns names.
For example, given a table with three columns:
l_orderkey (varchar)
l_quantity (integer)
l_extendedprice (double)
To load data into this table requires three input fields that match the given types (string, integer, and double). If a field mapping is not provided, the input data will be mapped by position to the target columns. So the first input field will be mapped to column l_orderkey, the second input field to column l_quantity, and the third input field to column l_extendedprice. If the input data is not ordered correctly, then the load will fail. Providing a field map specifies which input fields map to which database columns. See the code examples for the details of providing a field map.
The Actian Matrix database supports the concept of slices. A slice represents a unit of parallelism within the database. Each node within a database instance normally supports multiple slices. The loader operator communicates directly with slices to transfer data into the target database. The number of connections made into the database is determined by the parallelism level of the loader operator. The input data is not repartitioned to match the number of target slices. If the parallelism (partition count) of the loader operator is less than the number of database slices, some slices will be left idle during the load process. To repartition the data to match the number of slices in the target database, explicitly set the parallelism level of the loader operator to match the number of database slices.
A component of the data loader executes within the database. This in-database component supports tracing for debugging. The tracing level can be set to values 0 to 9. By default, the trace level is set to 0. To increase the amount of tracing produced in the database by the loader, set the trace level to a positive, non-zero value. The traceLevel property is used to set the Matrix UDF (user defined function) trace level.
Network communication between this DataFlow operator and the Matrix database is accomplished using TCP/IP sockets. The properties startPortRange and endPortRange can be used to limit the range of TCP/IP ports used for this communication. The ports within this range will need to be available for two-way communication. Within an environment containing a firewall, the ports will need to be “opened” within the firewall configuration. The default port range is from 40000 to 41000 (exclusive). Each machine in both the DataFlow cluster and the Matrix cluster must also have a fully qualified host name, and each of these names must be resolvable on all other machines in both clusters, using either DNS or /etc/hosts (or equivalent) files.
When you run multiple workflows concurrently, or run workflows with multiple Load Matrix operators, ensure the port range is large enough to allow at least one open port per slice per operator. Also, ensure Matrix database workload management is configured to allow enough concurrent queries to leave at least one available for each active Load Matrix operator. This availability is configured using the max_concurrent_queries property in ~paraccel/padb/rel/etc/padb_wlm.xml on the Matrix server. You may configure a DataFlow service class with specific properties for DataFlow jobs, which will allow the loader to check for available resources before launching a job, and fail quickly if there are insufficient resources on the Matrix server; without this service class, a job may hang when there are insufficient resources. For more information about configuring this service class, see Installing and Configuring DataFlow for Matrix On-Demand Integration (ODI).
The DataFlow Matrix ODI should be installed on the Matrix database prior to using this operator. The ODI currently supports Actian Matrix versions 4.0, 5.0, and 5.1. For instructions on installing the ODI, see Installing and Configuring DataFlow for Matrix On-Demand Integration (ODI).
A network connection for data transfer is created for each parallel data worker. To limit the number of network connections used, limit the parallelism of the LoadMatrix operator. A limit on connections can be explicitly set on the operator using the maxConnections operator property. See the properties section below for more information.
Note that modifying the parallelism level of the LoadMatrix operator either through the operator setting or the maxConnections property may incur the overhead of data redistribution to match the changed parallelism level. To prevent redistribution, set the overall parallelism of the DataFlow job to limit the number of data connections used into Matrix.
Note:  Setting the maxConnections property may incur the overhead of data redistribution. Data redistribution occurs to gather data from a higher number of data streams to a lower number of data streams within DataFlow.
The operator supports running a SQL command before the data loading starts (initializeTableSQL property) and after the load ends (finalizeTableSQL property). These SQL commands are submitted to the Matrix system using a JDBC connection. If the initial SQL commands fail, the entire load operation will be abandoned and the Dataflow execution will terminate. The final SQL commands are executed after the load successfully takes place. If the final SQL commands fail, an error is logged and execution is halted. However, the data that has been successfully loaded is not rolled back.
Note:  The Time and Time with Time Zone data types were added in Matrix 5.0, but are only supported by the ODI for Matrix versions up to 5.3.1. When loading data into a Matrix 5.0 database, any fields of these types will be populated with null values, and a warning will be generated.
Code Examples
The following code example demonstrates using the LoadMatrix operator using the Java API.
Using the LoadMatrix operator in Java
// Build a map of source field names to target table column names.
// Only these fields will loaded. Other source fields are dropped.
// Target columns not mapped will be null filled.
Map<String, String> fieldMap = new HashMap<String, String>();
fieldMap.put("orderkey", "l_orderkey");
fieldMap.put("partkey", "l_partkey");
fieldMap.put("suppkey", "l_suppkey");
fieldMap.put("linenumber", "l_linenumber");
fieldMap.put("quantity", "l_quantity");
fieldMap.put("extendedprice", "l_extendedprice");

// Create the operator within an application and set the properties.
LoadMatrix loader = app.add(new LoadMatrix ());
loader.setHost("database-server");
loader.setDatabase("test");
loader.setTable("lineitem");
loader.setUser("user");
loader.setPassword("password");
loader.setRenameMapping(fieldMap);
loader.setLogFrequency(100000); // logs progress every 100K records
Using the LoadMatrix Operator in RushScript
var fieldMap = {
    'orderkey':'l_orderkey',
    'partkey':'l_partkey',
    'suppkey':'l_suppkey',
    'linenumber':'l_linenumber',
    'quantity':'l_quantity',
    'extendedprice':'l_extendedprice'
};
 
// Load data into Matrix.
dr.loadMatrix(
    data, 
    {
        host:'leader',
        database:'test', 
        table:'lineitem',
        renameMapping:fieldMap,
        user:'user', 
        password:'password',
        logFrequency:100000
    });
Properties
The LoadMatrix operator supports the following properties
Name
Type
Description
database
String
The name of the target database.
host
String
Host name of the Matrix leader node.
endPortRange
int
The end point (exclusive) of the range of network ports to use for communication between the executing DataFlow operators and the Matrix database for control messages and data transfers. Default: 41000.
finalizeTableSQL
String
The SQL statement to execute after processing all the records. For example, this could be used to execute a SQL CREATE INDEX statement.
initializeTableSQL
String
The SQL statement to execute before processing any records.
logFrequency
int
The frequency to log loading progress, measured in rows processed. Defaults to 0, which indicates no progress logging is provided.
maxConnections
int
Limits the number of data connections made to Matrix for data transfers. Defaults to zero which implies a connection per DataFlow parallel stream. Setting this value may incur the overhead of data redistribution.
password
String
The password for the user account that has write/copy permissions for the target table.
port
int
The port number of the Matrix leader node. Default: 5439, the default value for Matrix.
renameMapping
Map<String, String>
A map of source field names to target table column names. If the input data schema matches the target table schema in number of fields, order, and type of the fields, then this property is not needed. If a map is provided source fields that are not mapped are dropped. Likewise, target columns that are not mapped will be null filled. Non-nullable fields that are not mapped will result in a SQL error.
retryCount
int
The number of times communication requests to the MPP database system are retried before being considered an error. The retry count may need to be increased for large MPP systems with many nodes.
startPortRange
int
The start point (inclusive) of the range of network ports to use for communication between the executing DataFlow operators and the Matrix database for control messages and data transfers. Default: 40000.
table
String
The name of the table that is the target of the data load.
timeoutInterval
int
The timeout interval (in seconds) for communication requests the target MPP database system.
traceLevel
int
A value 0–9 that indicates the level of trace logging configured for the ODI implementation. Setting the trace level to a nonzero value enables debug logging within the database. The logs can be accessed within the database using by querying the STL_UDF_TRACE system table.
user
String
The database user account that has copy permissions to the target database table. This property is always needed for the SQL copy method. It is optionally required for use with vwload. When provided with vwload, it allows the DBA to impersonate the database user for the purpose of loading data into the target table.
Ports
The LoadMatrix operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to load into the target Matrix table.
Using the MatrixSink Operator to Load Actian Matrix from SQL
The MatrixSink operator is similar to the LoadMatrix operator. Both the operators push data from a DataFlow workflow into a Matrix instance. The difference between the two operators is the invocation. The deployment and invocation of workflows having the MatrixSink operator is run directly from Matrix SQL using a user defined function (UDF).
Overview
Following is the MatrixSink operator framework:
1. You can create a workflow along with a cluster configuration, serialize it to JSON format, and store it in a table in the Matrix database. You can do this using the DataFlow export option in KNIME. For more information, see Exporting an Actian DataFlow Workflow.
Note:  Serialization creates a binary or textual representation of the specified in-memory object.
2. Run the invoke_dataflow UDF from the Matrix psql console or SQL APIs (JDBC) and enter ID of the serialized workflow as an argument. The invoke_dataflow UDF will retrieve the serialized workflow from the table and include the connection information for this Matrix instance into the serialized MatrixSink operator.
3. The invoke_dataflow UDF sends the configured serialized workflow to a REST service available on the DataFlow cluster manager.
4. The REST service deserializes the workflow and deploys it to the cluster.
5. When you run the MatrixSink operator on the cluster, the input data is moved to the Matrix instance (similar to LoadMatrix operator).
/download/attachments/20480498/matrix_sink.overview.png?version=1&modificationDate=1418044770924&api=v2&effects=border-simple%2Cblur-border
The following steps provide the utilization process of the MatrixSink operator:
1. Deploys the invoke_dataflow UDF to the Matrix instance.
2. Creates the workflow and REST service configuration tables in the Matrix database.
3. Populates the tables with JSON serialized workflows, cluster configuration, and REST service configuration.
4. Runs the invoke_dataflow UDF from the Matrix psql console or other APIs (JDBC).
For more information about the data transfer between the workflow and Matrix instance, see Using the LoadMatrix Operator to Load Actian Matrix.
Before You Begin
The invoke_dataflow UDF requires Actian Matrix version to be between 5.0.1 and 5.3.1.
The REST service is included with the cluster manager for DataFlow 6.4 or higher.
Setting Up the Matrix Database
Installing the invoke_dataflow UDF
DataFlow distribution provides installation scripts for Matrix UDFs. After installing ODI-DataFlow-Matrix5-xy.rpm, the scripts are available in /opt/paraccel/bin location. For more information, see Installing and Configuring DataFlow for Matrix On-Demand Integration (ODI).
After running the installation scripts , the invoke_dataflow UDF will be registered with the database.
Note:  UDFs are installed for each database.
Creating the workflow and REST service configuration tables in the database
The invoke_dataflow UDF requires two user space tables. One table stores the connection information for the REST service(s) and the other table stores the actual serialized workflows and references the associated REST service entry. The schema is fixed for both the tables and the table names can be configured. The default table names are workflowconfigs and workflows.
Note:  We recommend to automatically create these tables using the DataFlow export wizard. For more information, see Exporting an Actian DataFlow Workflow.
Workflow REST service config table definition (SQL)
CREATE TABLE workflowconfigs(
  name VARCHAR(128) PRIMARY KEY NOT NULL, --  unique name for this REST service configuration
  url VARCHAR(1024) NOT NULL, -- URL for the REST service workflow submission
  username VARCHAR(128) NOT NULL, -- DataFlow cluster manager user name
  password VARCHAR(128) NOT NULL, -- DataFlow cluster manager password
  startport INT NOT NULL, -- start of the port range used for DataFlow/Matrix internal communication
  endport INT NOT NULL -- start of the port range used for DataFlow/Matrix internal communication
);
If both startport and endport are specified as 0, then the default range (40000–40999) is assumed. However, the port range can be overridden using the UDF parameters upon invocation.
Workflow table definition (SQL)
CREATE TABLE workflows(
  name VARCHAR(128) PRIMARY KEY NOT NULL, -- unique name for this workflow
  workflow VARCHAR(65535) NOT NULL, -- the JSON serialized workflow
  config VARCHAR(128) REFERENCES workflowconfigs(name) NOT NULL -- reference to the workflow REST service config
);
Populating the Matrix Database Tables
Populating the REST service configuration table
A workflow REST service configuration entry has a unique name, the submission URL, user credentials for the DataFlow cluster manager running the service, and the port range. The submission URL pattern is /dispatcher/rest/executeWorkflow/submit.
Workflow REST service config entry example (SQL)
INSERT INTO workflowconfigs(name, url, username, password, startport, endport) VALUES(
  'example cluster',
  'http://foo.example:1100/dispatcher/rest/executeWorkflow/submit',
  'root',
  'changeit',
  40000,
  40999
);
Preparing a workflow
The input required for the workflows table is a LogicalGraph instance and an EngineConfig instance.
Important!  The workflow must have only one instance of the MatrixSink operator.
Note:  The EngineConfig instance will determine the cluster to deploy the workflow. This can be any DataFlow cluster and may not be the cluster with workflow deployed on the REST service. If the EngineConfig does not have the cluster configuration, then the workflow is run in plain, non-cluster mode on the REST service.
Populating the workflow table
A workflow entry has a unique name, the JSON serialized workflow with its cluster configuration settings, and a reference to the workflow REST service configuration to use.
Note:  We recommend you use the Exporting an Actian DataFlow Workflow to automatically create workflow entries from KNIME DataFlow projects.
The JSON serialized workflow can be obtained from wrapping a LogicalGraph and an EngineConfig in a LogicalGraphWithConfig and passing it into JSON serialization.
JSON serialization example (Java)
public String serialize(LogicalGraph graph, EngineConfig config) {
 LogicalGraphWithConfig graphWithConfig = new LogicalGraphWithConfig(config, graph);
 return new JSON().format(graphWithConfig);
}
The resulting serialized string is inserted into the workflows table.
Workflow insertion example (Java)
public int deployWorkflow(Connection conn, String name, String workflow, String configName) throws SQLException {
  String sql = "INSERT INTO workflows (name, workflow, config) VALUES (?, ?, ?)";
  PreparedStatement stat = conn.prepareStatement(sql);
  stat.setString(1, name);
  stat.setString(2, workflow);
  stat.setString(3, configName);
  return stat.executeUpdate();
}
You can also insert into the workflows table in SQL as well.
Workflow insertion example (SQL)
INSERT INTO workflows (name, workflow, config) VALUES (
  'example workflow',
  '{"engineConfig":{...},"logicalGraph":{...}}',
  'example cluster'
);
However, the easiest way to populate the workflows table is using the Exporting an Actian DataFlow Workflow.
Invoking a Workflow from Matrix SQL
invoke_dataflow syntax (SQL)
SELECT * FROM invoke_dataflow (WITH
  workflow('test_workflow') -- required, the ID of the workflow to be executed in the workflow table
  target('testtable') -- required, an existing table on this DB specifying the expected data schema
  workflow_table('myworkflows') -- optional, table containing the serialized workflows, default: 'workflows'
  config_table('myconfigs') -- optional, table containing the REST service config, default: 'workflowconfigs'
  start_port_range(42000) -- optional, start of the server socket port range, overrides spec from config table
  end_port_range(42999) -- optional, end of the server socket port range, overrides spec from config table
);
The invoking will retrieve the entry with ID 'test_workflow' from the myworkflows table and send the serialized workflow to the REST service specified by the associated myconfigs table entry. The schema of the input to the MatrixSink operator should match the schema of the Matrix database table testtable. Both end points of the internal communication (the MatrixSink operator running in the workflow on the cluster and the invoke_dataflow UDF on the Matrix instance) use the specified port range when opening outbound listening sockets.
A DataFlow record schema matches a Matrix table schema if all the table column names are present in the record schema and the resulting field/column pairs have compatible types.
The Matrix column names are normalized to lowercase.
Fields in the record schema that are not present in the table schema are ignored.
The table specified by the target parameter is only used as a schema template. The input data is not stored automatically in this table. To move data into this or other table, use
SELECT * INTO <NewTable> FROM invoke_dataflow(WITH [...]);
or
INSERT INTO <ExistingTable> (<column_a> [,...]) SELECT <column_a> [,...] FROM invoke_dataflow (WITH [...]);
The workflow_table and config_table parameters are optional, but tables for workflow and config must exist in the database—if the parameters are omitted, the tables are supposed to have the respective default names.
Troubleshooting
The following provides items to verify while troubleshooting:
Error messages from the REST service is passed to the UDF and displayed on the Matrix psql console.
Detailed UDF logging information is available in the STL_UDF_TRACE Matrix system table. For more information, see the Actian Matrix documentation.
DataFlow cluster manager logs provides details.
Ensure that the invoke_dataflow UDF is properly installed. For example, check '\df' from the psql console. For more information, see the Actian Matrix documentation.
Ensure that the hostname of both Matrix host and DataFlow cluster host resolve to a publicly accessible IP or network interface.
Ensure that the Matrix host can access the REST service host over the network and vice versa.
Ensure that the Matrix host can be accessed over the network from the DataFlow cluster host and vice versa.
Ensure that the workflow contains only one instance of the MatrixSink operator.
Verify the settings in the workflow REST service config table.
Code Examples
The following code example provides using the MatrixSink operator in the Java API.
Using the MatrixSink operator in Java
MatrixSink matrixSink = graph.add(new MatrixSink());
The MatrixSink operator does not depend on any defined configuration. All the settings entered by the invoke_dataflow UDF when the workflow is deployed to the REST service.
Properties
Note:  The MatrixSink operator configurations cannot be defined. We recommend you use the default configurations. Any configuration that can be set on this operator is public only for implementation.
Ports
The MatrixSink operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The input data to bulk load into Matrix.
HBase Operators
Using the DeleteHBase Operator to Delete from HBase
Using the ReadHBase Operator to Read from HBase
Using the WriteHBase Operator to Write to HBase
Using the DeleteHBase Operator to Delete from HBase
The DeleteHBase operator writes delete markers to a table in HBase.
A DeleteMarker can be mapped to a select HBase cell or it can be mapped to a family as a sub table:
1. mapCellMarker(java.lang.String, java.lang.String, com.pervasive.datarush.hbase.DeleteHBase.DeleteMarker)– Map a DeleteMarker to a select cell within a column family. A row key field is required to uniquely identify cells. A DeleteMarker will be inserted for each mapped cell qualifier, for each input record.
2. mapFamilyMarker(java.lang.String, com.pervasive.datarush.hbase.DeleteHBase.DeleteMarker)– Map a DeleteMarker to a column family. If mapping DeleteMarker.DeleteFamily, then only a row key field is required; otherwise both a row key field and a qualifier key field are required to uniquely identify cells. A single DeleteMarker will be inserted for each input record.
A time key field can optionally be specified to allow the user to provide a timestamp value as part of the input record. If a time key field is not specified, then each record will default to the current time. If mapping DeleteMarker.Delete to delete a specific version of a cell, then a time key field is required to uniquely identify cells. DeleteMarkers that delete past versions will use the time key field or default to current time.
The input will be repartitioned using HBase table region row key ranges. Each partition will sort its DeleteMarkers in row-key ascending, qualifier-key ascending, time-key descending order, and then write the DeleteMarkers to the appropriate regions.
If the specified HBase table does not exist, it will be created. The number of regions created will be MAX (4, the level of parallelism).
Code Example
The following example demonstrates using the DeleteHBase operator to delete cells in a table in HBase.
Using the DeleteHBase Operator
DeleteHBase delete = graph.add(new DeleteHBase());
delete.setTableName("table");
delete.setRowFieldName("rowkey");
delete.mapCellMarker("family1", "qualifier1", DeleteMarker.DeleteColumn);
delete.mapCellMarker("family1", "qualifier2", DeleteMarker.DeleteColumn);
delete.mapCellMarker("family2", "qualifier1", DeleteMarker.DeleteColumn);
graph.connect(data.getOutput(), delete.getInput());
Using the DeleteHBase Operator in RushScript
var markerMap = {
    'family1':{'qualifier1':'DeleteColumn', 'qualifier2':'DeleteColumn'},
    'family2':{'qualifier1':'DeleteColumn'}
}
var data = dr.deleteHBase(
                data, 
                {
                    tableName:'table', 
                    rowFieldName:'rowKey',
                    cellMarkerMap:markerMap
                });
Properties
The following table provides the properties supported by DeleteHBase operator.
Name
Type
Description
cellMarkerMap
Map<String, Map<String, DeleteMarker>>
Maps DeleteMarkers to selected cells within column families. A DeleteMarker is inserted for each mapped cell qualifier and each input record.
configuration
HDFSConfiguration
The HDFS configuration used to connect to HBase.
familyMarkerMap
Map<String, DeleteMarker>
Maps DeleteMarkers to selected column families. A DeleteMarker is inserted for each family and each input record.
outputPath
org.apache.hadoop.fs.Path
The output path that sets the location used to create HFiles. Default: hdfs://user/userName/.
qualifierFieldName
String
The qualifier key field name.
rowFieldName
String
The row key field name.
tableName
String
The HBase table name.
timeFieldName
String
The version timestamp field name. This is optionally used to designate a version timestamp field. Default: current time
Ports
The DeleteHBase operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to DeleteHBase.
Using the ReadHBase Operator to Read from HBase
The ReadHBase operator is used to read a result set from a table in HBase. The result set is specified by various field mappings and can be filtered to only return a specific time range.
When running with parallelism enabled, each partition will read its assigned regions one region at a time in row-key order. This guarantees that records within the same region will not span a partition, and that each partition output is returned in row-key, qualifier-key order.
Specifying Field Mappings
Because of the nature of NoSQL databases , there may or may not be a particular schema that can be used across all the rows of a given table in HBase. Therefore, the field mappings when reading from HBase must be specified after adding the operator to a logical graph.
Within HBase a cell is uniquely identified by a tuple consisting of {row, column, timestamp}. The cell at a particular row and column with the latest timestamp is considered the latest version of a given cell. Since HBase stores multiple previous versions of a given cell, the default behavior of the operator returns only the latest version of any retrieved cell. Cell versions can be filtered by specifying the time range with the associated properties.
Because HBase by default stores all of its data as pure binary, the ReadHBase operator will use a catalog table to keep track of the data types that the raw data should be converted into. The various conversion methods for specific DataFlow types are listed in the following table.
Java type
DataFlow type
Conversion method
int
TokenTypeConstant.INT
org.apache.hadoop.util.Bytes.toBytes(int)
long
TokenTypeConstant.LONG
org.apache.hadoop.util.Bytes.toBytes(long)
float
TokenTypeConstant.FLOAT
org.apache.hadoop.util.Bytes.toBytes(float)
double
TokenTypeConstant.DOUBLE
org.apache.hadoop.util.Bytes.toBytes(double)
BigDecimal
TokenTypeConstant.NUMERIC
org.apache.hadoop.util.Bytes.toBytes(BigDecimal)
String
TokenTypeConstant.STRING
org.apache.hadoop.util.Bytes.toBytes(String)
byte[]
TokenTypeConstant.BINARY
N/A - store/retrieve byte array as is
All other data types will be serialized or deserialized using the default DataFlow formats. If the entry for a particular table is missing in the catalog, the data will be returned as byte arrays. This allows the user to manually apply any required conversions.
Since HBase is a column-oriented database, the column portion of the index key consists of two parts: a column family and a column qualifier. The column family identifies one of several families that are determined when the table is initially created. Column families provide a way to logically and physically partition columns into groups such that the cells associated with a particular family are stored together in the same files on disk. The column qualifier uniquely identifies a cell and its previous versions within a column family.
DataFlow fields can be mapped to HBase cells in one of two ways:
mapCell()—Map individual cells within a column family. Cells within a family can be of heterogeneous types and only the mapped cells are accessed. The mapped fields will be together in a single record. An optional row-key field can be specified to uniquely identify each DataFlow flow record.
mapFamily()—Map all cells within a column family as a subtable. All cells within a family are of homogeneous types and all cells are accessed. Each cell within a family is contained in an individual record. Optional row-key, qualifier-key fields can be specified to uniquely identify each DataFlow flow record.
In both cases a cell can contain a single field or a record of fields. Mapping a single cell as a record of fields will allow multiple fields to be packed into a single cell thus greatly increasing I/O performance at the expense of reduced version granularity. All fields packed together in a single cell are versioned together and therefore all fields must be present when writing. The default DataFlow serialization is used exclusively in this case.
Using HCatalog
Field mappings can also be derived from a schema stored in HCatalog. If a table’s schema is stored in HCatalog, the ReadHBase operator only needs to be directed to this HCatalog table; the HBase table name and field mappings are not required. Individual cells within a column family will be mapped, as defined in the HCatalog table mapping. The resulting DataFlow fields will have the same names as the fields in HCatalog.
To use HCatalog with the ReadHBase operator, specify the HCatalog database name and HCatalog table name where the schema is stored, rather than specifying an HBase table name and field mapping. Optionally, specify a list of fields, as named in HCatalog, to read from the table; if this list is not specified, all fields mapped in HCatalog will be read from the table.
Hadoop Configuration Properties
An HBase cluster has the following minimum connection configuration properties:
fs.default.name
The HDFS URL (such as hdfs://headnode:8020)
hbase.rootdir
The root directory where HBase data is stored (such as hdfs://headnode:8020/hbase)
hbase.zookeeper.quroum
The nodes running ZooKeeper
hbase.zookeeper.property.clientPort
The ZooKeeper client port.
Additionally, other properties may occasionally need to be defined:
hive.metastore.uris
The Hive Metastore URI, if using HCatalog (such as thrift://headnode:9083)
zookeeper.znode.parent
The parent znode in ZooKeeper used by HBase. This should be defined if the default value of /hbase is not being used by your cluster. (For some Hortonworks clusters, you may need to define this property with a value of /hbase-unsecure.)
Code Example
The following example demonstrates using the ReadHBase operator to read an entire table in HBase. It uses the typical map cell as field mapping to read the table.
Using the ReadHBase Operator in Java
ReadHBase reader = graph.add(new ReadHBase());
reader.setTableName("HBaseTable");
reader.setRowFieldName("rowkey");
reader.mapCell("family1", "qualifier1", "data1");
reader.mapCell("family1", "qualifier2", "data2");
reader.mapCell("family2", "qualifier1", "data3");
graph.connect(reader.getOutput(), operator.getInput());
Using the ReadHBase Operator in RushScript
var data = dr.readHBase({tableName:'HBaseTable', rowFieldName:'rowkey'});
The following example demonstrates using the ReadHBase operator with HCatalog.
Using the ReadHBase Operator with HCatalog in Java
ReadHBase reader = graph.add(new ReadHBase());
reader.setHCatalogDatabase("HCatDatabase");
reader.setHCatalogTable("HCatTable");
graph.connect(reader.getOutput(), operator.getInput());
Using the ReadHBase Operator with HCatalog in RushScript
var data = dr.readHBase({hcatalogDatabase:'HCatDatabase', hcatalogTable:'HCatTable'});
Properties
The ReadHBase operator supports the following properties.
Name
Type
Description
startTime
java.util.Date
The time range filter start time.
endTime
java.util.Date
The time range filter end time.
tableName
String
The HBase table name.
rowFieldName
String
The row key field name. Only required when reading from two or more families.
qualifierFieldName
String
The qualifier key field name. Only required when reading from two or more families using the map family mapping.
timeFieldName
String
The version timestamp field name. Only required when versionCount > 1.
versionCount
long
The cell version count. Default: 1.
hCatalogDatabase
String
The HCatalog database name. Only required when reading a table whose schema is stored in HCatalog.
hCatalogTable
String
The HCatalog table name. Only required when reading a table whose schema is stored in HCatalog.
hCatalogFields
String...
A list of fields (as named in HCatalog) to read from the table. Only applicable when reading a table whose schema is stored in HCatalog. If not specified, all fields will be read.
configuration
HDFSConfiguration
The HDFS configuration used to connect to the HBase.
Ports
The ReadHBase operator provides a single output port.
Name
Type
Get method
Description
output
getOutput()
Provides the output record data.
Using the WriteHBase Operator to Write to HBase
The WriteHBase operator is used to write a result set to a table in HBase. If the target table for the write does not exist, it will be automatically created and an entry added to the catalog table to keep track of the current table’s schema. The input set and how it should be mapped to a table in HBase is specified by various field mappings defined within the operator. For more information on how to specify the HBase operator’s field mappings, see the section on specifying field mappings in Using the ReadHBase Operator to Read from HBase.
When writing to a table in HBase the default behavior will generate a unique binary row key for every inserted record. The user may optionally specify a row key input field within the data set, in which case the input will be repartitioned using HBase table region row-key ranges. Each partition will individually sort blocks of rows using the row and qualifier keys and region boundaries, and will write the rows to the appropriate regions. If the row key is not specified, the input will be written to regions local to the partition and no repartitioning will be performed. A row key input field must be one of the following types to allow serialization to be performed:
TokenTypeConstant.LONG
TokenTypeConstant.STRING
TokenTypeConstant.BINARY
A unique qualifier key will also be generated for each record if the user maps a family as a subtable and does not specify a qualifier key input field. The qualifier key is generated in a similar manner as the row key, and has the same type restrictions.
A timestamp field can optionally be specified to allow the user to provide a timestamp value as part of the input record. If a timestamp field is not specified, then each record will default to the current time. In both cases the timestamp value is narrowed to millisecond resolution to match HBase and will be advanced slightly to uniquely identify cells with duplicate row/qualifier keys and timestamp values. This ensures the operator is tolerant of duplicate cell versions in the input stream as long as they occur infrequently. Importing large numbers of duplicate cell versions in a short amount of time (more than thousands of duplicates per second) may result in significant time skew to maintain uniqueness.
Using HCatalog
The WriteHBase operator can use HCatalog in two ways.
If the target table already defined in HCatalog, similarly to the ReadHBase operator, no mapping needs to be defined; instead, it will be read from HCatalog. Define the HCatalog database and table name. Optionally, list the fields you would like to write to the table; if this property is not defined, all fields will be written.
If the target table has not yet been defined in HCatalog, a mapping will need to be provided. However, by providing an HCatalog database and table name, a new entry for this table will be created in HCatalog, which can be used for subsequent reads and writes of this table without manually specifying a mapping. If a list of fields is specified, only these fields will be added to the HCatalog schema; otherwise, all mapped fields are included.
The HCatalog field names will be matched to the DataFlow field names. These field names must be valid for HCatalog: containing only lowercase and numeric characters. If necessary, Using the DeriveFields Operator to Compute New Fields can be used to rename input fields to match these requirements.
Hadoop Configuration Properties
An HBase cluster has the following minimum connection configuration properties:
fs.default.name
The HDFS URL (such as hdfs://headnode:8020)
hbase.rootdir
The root directory where HBase data is stored (such as hdfs://headnode:8020/hbase)
hbase.zookeeper.quroum
The nodes running ZooKeeper
hbase.zookeeper.property.clientPort
The ZooKeeper client port.
Additionally, other properties may occasionally need to be defined:
hive.metastore.uris
The Hive Metastore URI, if using HCatalog (such as thrift://headnode:9083)
zookeeper.znode.parent
The parent znode in ZooKeeper used by HBase. This should be defined if the default value of /hbase is not being used by your cluster. (For some Hortonworks clusters, you may need to define this property with a value of /hbase-unsecure.)
Code Example
The following example demonstrates using the WriteHBase operator to write a table in HBase. It uses the typical map cell as field mapping to write the table.
Using the WriteHBase operator
WriteHBase writer = graph.add(new WriteHBase());
writer.setTableName("data");
writer.mapCell("family1", "qualifier1", "data1");
writer.mapCell("family1", "qualifier2", "data2");
writer.mapCell("family2", "qualifier1", "data3");
graph.connect(data.getOutput(), writer.getInput());
The following example demonstrates using the WriteHBase operator to write to a new HCatalog table.
Using the WriteHBase operator with a new HCatalog table
WriteHBase writer = graph.add(new WriteHBase());
writer.setTableName("data");
writer.mapCell("family1", "qualifier1", "data1");
writer.mapCell("family1", "qualifier2", "data2");
writer.mapCell("family2", "qualifier1", "data3");
writer.setHCatalogDatabase("hCatDatabase");
writer.setHCatalogTable("hCatTable");
graph.connect(data.getOutput(), writer.getInput());
The following example demonstrates using the WriteHBase operator to write to an existing HCatalog table.
Using the WriteHBase operator with an existing HCatalog table
WriteHBase writer = graph.add(new WriteHBase());
writer.setHCatalogDatabase("hCatDatabase");
writer.setHCatalogTable("hCatTable");
graph.connect(data.getOutput(), writer.getInput());
Properties
The WriteHBase operator supports the following properties.
Name
Type
Description
configuration
HDFSConfiguration
The HDFS configuration used to connect to HBase.
hCatalogDatabase
String
The HCatalog database name. Only required when writing a table’s schema to HCatalog or when writing to a table whose schema is already defined in HCatalog.
hCatalogTable
String
The HCatalog table name. Only required when writing a table’s schema to HCatalog or when writing to a table whose schema is already defined in HCatalog.
hCatalogFields
String...
The fields (as named in HCatalog) to write to the table. Only applicable when writing a table's schema to HCatalog or when writing to a table whose schema is already defined in HCatalog. If not specified, all fields are written.
qualifierFieldName
String
The qualifier-key field name.
rowFieldName
String
The row-key field name.
timeFieldName
String
The version timestamp field name. This is optionally used to designate a version timestamp field. Default current time.
Ports
The WriteHBase operator provides a single input port.
Name
Type
Get method
Description
input
getInput()
The input data to be written to HBase.