Performing I/O Operations
The DataFlow operator library contains several prebuilt I/O operators. This section covers each of those operators and provides details on how to use them.
Using ByteSource Implementations
The
ByteSource interface defines a way to obtain binary data from data sources for downstream decoding, parsing, and data extraction. Separating this interface from the specific read operators allows readers to access source data in many different ways without needing to write a new operator for each possible access path.
File-based readers expose convenience methods for setting the
ByteSource to use as either a Path or a String containing the path to the files to access. Underneath, the Path and String implementations are converted into
ByteSource objects for use in the lower level I/O functions.
In general, the Path or String interfaces for the file-based read operators will provide the functionality needed. For example, wildcard characters can be used to specify a pattern of files to read. Likewise, passing in a directory implies all files in the directory should be read as one logical unit. However, sometimes it will be necessary to use a ByteSource directly. The
ByteSource implementations are covered in detail with usage examples in this section.
For more information about the ByteSource interface and the classes that implement it, see the
JavaDoc reference documentation.
BasicByteSource
A
BasicByteSource represents reading data from a single Path that can be provided as either a Path or String parameter. If the Path is a directory, it represents reading from all files in the directory. These interfaces provide no more functionality than using a reader directly with a Path for the path name.
A
BasicByteSource will also detect compressed data automatically based on file suffixes, although additional constructors provide the ability to specifically set the compression format.
A constructor is also provided that allows specifying an action to take if an input file cannot be read. This is useful for situations where a directory path name is given and a subset of the files contained within it cannot be accessed for reading.
Following is an example of creating a byte source that supports compression (meaning the data being read is already compressed and should be decompressed during the reading process).
Using the BasicByteSource operator with Compression
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.io.compression.CompressionFormats;
import com.pervasive.datarush.operators.io.BasicByteSource;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Read a file that is compressed.
*
*/
public class ReadCompressedRatings {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadRatings");
// List the supported compression formats
System.out.println("supported formats: " + CompressionFormats.listSupportedFormats());
// Create the byte source for the compressed data
BasicByteSource source =
new BasicByteSource(
Paths.asPath("data/ratings.txt.gz"),
CompressionFormats.lookupFormat("gzip"));
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText());
reader.setSource(source);
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(
INT("userID"),
INT("movieID"),
INT("rating"),
STRING("timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Log results
LogRows logger = graph.add(new LogRows(1));
graph.connect(reader.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the previous example:
• The list of supported compression formats is printed out. Compression formats are extensible, so custom formats can be added as needed.
• A
BasicByteSource is constructed. In this case we need to provide a Path to the files to read and the compression format to use. The helper class
CompressionFormats is used to obtain the
CompressionFormat implementation for the "gzip" compression type. Since here the source name ends with ".gz", providing a format is not strictly required; the
ByteSource would automatically detect the format in this case.
• The created byte source is passed to the reader as the source property. The reader will use the byte source to obtain binary data for decoding, parsing, and data extraction. The data source will decompress the input data before passing it on to the reader. This allows the reader to be agnostic as to what pre-processing must take place on the data before data parsing is applied.
ConcatenatedByteSource
A
ConcatenatedByteSource combines several
ByteSource objects together into a single
ByteSource. When read, the source produces the data from each underlying source in turn, resulting in a concatenation of the contents.
A typical usage is to read a provided list of files. Usually a list of files can be read by either reading all of the files in a directory or using wildcards in file names to read files that match a specific pattern. At times this functionality may not be sufficient. In this case, a list of files can be generated and used to construct a
ConcatenatedByteSource.
As with the
BasicByteSource, once constructed, the byte source is handed to a reader as the source property. The reader then consumes the bytes of the files from the source and processes them as needed.
Following is an example using a
ConcatenatedByteSource. The example is contrived because the files are within the same directory and could have been specified using a wildcard pattern in the file name.
Using the ConcatenatedByteSource operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.LONG;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.TIMESTAMP;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import java.util.Arrays;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Path;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.operators.io.ByteSource;
import com.pervasive.datarush.operators.io.ConcatenatedByteSource;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Read a list of files as one logical unit.
*
*/
public class ReadMultipleFiles {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
// Create the byte source for the list of files to read
Path[] paths = new Path[] {
Paths.asPath("data/logs1.txt"),
Paths.asPath("data/logs2.txt"),
Paths.asPath("data/logs3.txt"),
};
ByteSource source = ConcatenatedByteSource.getConcatenatedSource(Arrays.asList(paths));
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText());
reader.setSource(source);
reader.setFieldSeparator("|");
RecordTokenType type = record(
STRING("entityID"),
TIMESTAMP("timestamp"),
LONG("siteID"),
INT("compromised"),
LONG("eventID"));
reader.setSchema(TextRecord.convert(type));
// Log results
LogRows logger = graph.add(new LogRows(1));
graph.connect(reader.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the
ConcatenatedByteSource example:
• A list of Path objects is assembled representing the files to read. This list can be built applying whatever means necessary. The example is building a simple list of files that exist in the same directory. However, the list of files could be from different directories with different naming schemes. The data contained in the files must all be of the same expected format; if a header were present, it would need to appear in each file.
• The list of Path objects is converted into a
ByteSource using the static method getConcatenatedSource(). This method creates the
ConcatenatedByteSource from the list of Paths.
• The byte source is set as the source property on the reader.
GlobbingByteSource
A
GlobbingByteSource applies a file globbing pattern to the file name within a file path. Globbing is the capability of using wildcard characters within a file name to match the pattern to files in an actual file system. After the globbing is accomplished, a
GlobbingByteSource acts similarly to a
ConcatenatedByteSource, using the list of filtered paths as the files to be read as one logical unit.
The interfaces into the file read operators allow setting the source as a String. When this interface is used, the given String may contain any supported wildcard characters. The readers use a GlobbingByteSource in that case to resolve the actual files to be read.
The only reason then to use a
GlobbingByteSource directly is to specify how to handle files that match the globbing pattern but are unreadable.
The followoing code example uses a GlobbingByteSource directly, setting the action to take when a source file is unreadable.
Using the GlobbingByteSource operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.LONG;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.TIMESTAMP;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.ByteSource;
import com.pervasive.datarush.operators.io.GlobbingByteSource;
import com.pervasive.datarush.operators.io.UnreadableSourceAction;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Read a list of files as one logical unit.
*
*/
public class ReadGlobbedFiles {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
// Create a globbing byte source using a simple wildcard
ByteSource source = new GlobbingByteSource("data/logs?.txt", UnreadableSourceAction.FAIL);
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText());
reader.setSource(source);
reader.setFieldSeparator("|");
RecordTokenType type = record(
STRING("entityID"),
TIMESTAMP("timestamp"),
LONG("siteID"),
INT("compromised"),
LONG("eventID"));
reader.setSchema(TextRecord.convert(type));
// Log results
LogRows logger = graph.add(new LogRows(1));
graph.connect(reader.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the
GlobbingByteSource example:
• The
GlobbingByteSource is created with a file path containing a wildcard pattern for the file name. The action for unreadable source files is set to FAIL. Any source files that cannot be accessed using the file name pattern will cause an exception.
• The byte source is used as the source property for the reader. The reader will access all of the files matching the pattern as one logical unit of data.
Using ByteSink Implementations
The
ByteSink interface defines a way to abstractly write byte data from a writer to a target file or set of files. Separating this interface from file-based writer operators allows writer implementations to target data without direct knowledge of where or how the data is being written.
File-based writers expose convenience methods for setting the
ByteSink to use either a Path or a String containing the path to the files to write. Underneath, the Path and String implementations are converted into ByteSink objects for use in the lower level I/O functions.
In general, the Path or String interfaces for the file-based write operators provide the functionality needed. For example, passing in a directory implies each active stream of data in the application should write a unique file in the directory. This allows parallel writing to take place.
The only time the need arises to use a
ByteSink directly is to compress the output data before writing. The BasicByteSink implementation is covered in detail below with usage examples.
For more information about the
ByteSink interface and the classes that implement it, see the
JavaDoc reference documentation.
BasicByteSink
The
BasicByteSink implementation specifies the use and type of compression when writing the target files. The following code example shows how to create a
BasicByteSink specifying the type of compression to use. After the sink is created, it can be passed to a file write operator as the Sink property.
Using the BasicByteSink operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.io.WriteMode;
import com.pervasive.datarush.io.compression.CompressionFormats;
import com.pervasive.datarush.operators.io.BasicByteSink;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Write the ratings file with gzip compression.
*/
public class WriteCompressedRatings {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(
INT("userID"),
INT("movieID"),
INT("rating"),
STRING("timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Write the ratings using compression
BasicByteSink sink =
new BasicByteSink(
Paths.asPath("results/ratings.txt.gz"),
CompressionFormats.lookupFormat("gzip"));
WriteDelimitedText writer = graph.add(new WriteDelimitedText(sink, WriteMode.OVERWRITE));
writer.setFieldSeparator("::");
writer.setHeader(true);
writer.setFieldDelimiter("");
writer.setWriteSingleSink(true); // Write a single output file (loss of parallelism)
// Connect reader and writer
graph.connect(reader.getOutput(), writer.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the example:
• A
BasicByteSink is constructed using a Path and the "gzip" compression format.
• The sink with compression enabled is passed as the Sink property to the writer’s constructor.
• When the graph is executed, the data will be written in delimited text format and compressed using "gzip" compression.
Read Operators
Using the ReadAvro Operator to Read Apache Avro Data
The
ReadAvro operator reads a data file previously written using the
Apache Avro serialization format. The Avro format is a commonly used binary format that offers data compression and the ability to be parsed in parallel. Metadata about the data, such as its schema and compression format, are serialized into the file, making it available to readers.
The operator will translate the Avro schema into appropriate DataFlow types when possible—some schemas are not supported for reading, as described later.
As DataFlow operates on records, it is generally expected that the source data will have a RECORD schema type. If this is not the case, the operator treats the schema as if it were a record with a single field named "value."
The output record type will have fields with the same names and in the same order as the source schema. Output fields are assigned a type based on the schema of the source field with the same name. In general, Avro schema types are assigned DataFlow types according to the following table.
For types not listed previously, the schema type may or may not be mapped to a DataFlow type. If attempting to read a source with schema types that cannot be mapped to DataFlow types, the operator will produce an error. The conditions under which other schema types are supported are as follows:
• Source fields with ARRAY or MAP schema types are never supported.
• Source fields with a RECORD schema type are supported only when reading Avro data written using the
WriteAvro operator; fields with DataFlow types that do not have analogues in Avro are written as nested records. Source fields using these same schemas will be mapped back into the original DataFlow type.
• Source fields with a UNION schema type is only supported if it is a union of exactly two schema types where one type is NULL. In this case, the type is determined using the non-NULL schema type of the union.
For information about creating files containing data in Avro format using DataFlow, see
Using the WriteAvro Operator to Write Apache Avro Data.
When reading Avro files written by DataFlow, there may be additional metadata information about the data embedded within the files. If the reader has been configured to use this metadata, then it can obtain information about the ordering and partitioning of the data when it was written, which can eliminate the need to re-sort or partition the data.
Code Examples
Because Avro files are self-contained with respect to metadata, it is generally not necessary to provide any information other than the location of the data. Following is an example use of the operator in Java.
Using the ReadAvro operator in Java
ReadAvro reader = graph.add(new ReadAvro("data/ratings.avro"));
The following example demonstrates using the delimited text reader within RushScript.
var data = dr.readAvro({source:'data/ratings.avro'});
Properties
The
ReadAvro operator supports the following properties:
Ports
The
ReadAvro operator provides a single output port.
Using the ReadORC Operator to Read Apache ORC Files
The
ReadORC operator reads a data file that is written earlier using the
Apache Optimized Row Columnar (ORC) File format. The ORC format is supported by Apache Hive.
The ORC format is columnar-based but can also be autonomous. The columns in an ORC file separate the stripes or sections of the file. An internal index is used to track a section of the data within each column. This organization allows readers to efficiently omit the columns that are not required. Also, each column can apply a different compression method depending on the data type. Metadata about the ORC data, such as the schema and compression format are serialized into the file and are made available to the readers.
The operator translates the ORC file schema into appropriate DataFlow types when it is possible. A few ORC data types are not supported for reading. The columns with unsupported data types are omitted. The output record type has fields with the same names and in the same order as the source schema. The type in the output fields are assigned based on the schema of the source field with the same name.
In general, DataFlow types are assigned to the ORC schema types as shown in the following table.
Several ORC types are not supported by DataFlow. If these types are found in an ORC file, they will be ignored. The reader logs a message for all columns that are omitted having the unsupported data types. The following ORC data types are not supported:
• LIST
• MAP
• STRUCT
• UNION
Column Pruning
We recommend you limit the columns read to only the ones required for downstream processing, since ORC format is columnar. Use the selectedFields property to specify the fields to read. For more information, see
Properties. The ORC columns not included in the list are not omitted. This optimization provides a performance boost especially for files containing a large number of columns.
Note: Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see
Integrating DataFlow with Hadoop.
Code Examples
It is required to provide the location of the data to the operator because the ORC files are self-contained about the metadata. We recommend you limit the columns read to only the ones required for downstream processing. Because ORC format is columnar, reducing the columns read might enhance performance. Use the selectFields property to specify the columns to read from a given ORC data set.
The following example provides reading the ORC file data using Java.
Using ReadORC in Java
ReadORC reader = graph.add(new ReadORC("data/ratings.orc"));
Using ReadORC in RushScript
var data = dr.readORC({source:'data/ratings.orc'});
Properties
The
ReadORC operator supports the following properties.
Ports
The
ReadORC operator provides a single output port.
Using the ReadMDF Operator to Read MDF Files
This topic describes the DataFlow MDF read operator. For information about its KNIME node, see
MDF Reader.
The ReadMDF operator reads a data file previously written using the
ASAM MDF format. The MDF format is supported and maintained by ASAM.
MDF or Measurment Data Format is a binary file format used to store recorded and calculated data that is frequently used in post-measurement processing, off-line evaluation, or long-term storage.
It offers efficient and high performance storage of large amounts of measurement data. The file format allows the storage of the raw measurement data along with associated metadata and corresponding conversion formulas so that the raw data can still be interpreted correctly and utilized through post-processing.
The operator will translate the MDF schema into appropriate DataFlow types whenever possible, although because of the frequent usage of unsigned types in MDF data, sometimes the type used by DataFlow must be wider than the original type specified in the metadata to prevent loss of scale or precision.
Since DataFlow operates on the concept of homogenous data records within a given flow, a ReadMDF operator is only able to extract one record type from the file at a time, although multiple ReadMDF operators can read multiple records types concurrently from the same file.
The output record type will have fields with the names and types determined by the metadata provided in the file. The ordering of the fields will also correspond to the declaration order within the metadata, with the exception that the master channel will always be the first field even if it is not defined first.
Currently the operator only supports primitive types, which have an analog within DataFlow; therefore the extraction of MIME types in various media formats is not currently supported.
Code Examples
Since MDF files are self-contained with respect to metadata, it is generally not necessary to provide any information other than the location of the data and the data group containing the specified record that should be extracted. Following is an example use of the operator in Java.
Using the ReadMDF operator in Java
ReadMDF reader = graph.add(new ReadMDF("data/output.mf4"));
reader.setDataChannel(1);
reader.setRecordId(1);
The following example demonstrates using the MDF reader within RushScript:
var data = dr.readMDF({source:'data/output.mf4', dataChannel:1, recordId:1});
Properties
The ReadMDF operator supports the following properties:
Addtional properties are shared with delimited text.
Ports
The ReadMDF operator provides a single output port.
Using the ReadParquet Operator to Read Apache Parquet Files
The
ReadParquet operator reads data that is written earlier using
Apache Parquet format. The Parquet format is supported by Apache Hive.
Parquet is a columnar file format used to store the tabular form of data. Parquet supports compression and encoding the schemes effectively and allows specifying the compression schemes at each column level. It supports:
• Source projects such as Apache Hadoop (MapReduce), Apache Hive, Impala, and so on, as it presents the data in columnar format
• Compression codecs such as SNIPPY, GZIP, and LZO. The design allows integration with future types.
The ReadParquet <include link> operator uses Hive libraries through the shim layer and requires a Hadoop module configuration to be enabled, even if the workflow does not run on the cluster or access HDFS.
DataFlow automatically determines the equivalent data types from Parquet. The result is the output type of the reader. However, Parquet and DataFlow support different data types and not all data in Parquet format can be read. If it attempts to read data that cannot be represented in DataFlow, an error is returned.
The primitive Parquet types are mapped to DataFlow as shown in the following table.
Note: Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see
Integrating DataFlow with Hadoop.
Code Examples
The following example provides reading the Parquet file data using Java.
Using ReadParquet in Java
// Path to read entire Hive table
ReadParquet reader = new ReadParquet("hdfs://10.100.10.41:8020//apps/hive/warehouse/CityParquet");
// Path to read specific partition of Hive table from HDFS
// ReadParquet reader = new ReadParquet("hdfs://10.100.10.41:8020//apps/hive/warehouse/CityParquet/000000_0");
// Path to read parquet file from Local file system
// ReadParquet reader = new ReadParquet("C:/Parquet/Cities.parquet");
graph.add(reader);
Using ReadParquet in RushScript
var data = dr.readParquet({source:"hdfs://10.100.10.41:8020//apps/hive/warehouse/CityParquet/000000_0"});
Properties
The
ReadParquet operator supports the following properties.
Ports
The
ReadParquet operator provides a single output port.
Using the ReadFromJDBC Operator to Read Databases
The
ReadFromJDBC operator accesses relational database systems using a supplied JDBC driver. The JDBC driver must be in the class path of the DataFlow application. Each database provides a JDBC driver implementation that DataFlow can use to access data in the database. Reference the specific database to be accessed for driver-specific information.
The ReadFromJDBC operator can be used to read all of the columns from a specific table or to execute a provided query. The query provided can be a complex, multitable query. Follow the syntax guidelines of the database being queried.
The results of the query will be made available to the output port of the operator. The operator transforms the database column types to supported DataFlow scalar types. Some database-specific data types may not map well and will either be ignored or mapped to Java Object types.
The results of a database query executed through JDBC are returned through a ResultSet object. The ResultSet is used to iterate through the resultant rows and to access column data. The JDBC ResultSet class does not support multithreaded access. Given that, the default behavior of the ReadFromJDBC operator is to execute in nonparallel mode when provided a nonparameterized query.
To execute queries in parallel (and distributed), the ReadFromJDBC operator supports the use of parameterized queries. JDBC supports adding parameters to a query using the "?" character. Following is an example of a parameterized query. Note the use of the "?" character in the "where" clause.
Example query with parameters
select * from lineitem where l_shipmode = ?
When used as the data query for the ReadFromJDBC operator, a parameterized query can be executed in parallel. A set of parameters must be supplied to the parallel workers executing the parameterized queries. The parameters can be supplied in one of the following ways:
• Through the optional input port the ReadFromJDBC operator.
• Obtained by a parameter query supplied as a property to the operator ("parameterQuery"). The query is executed and the results are used as parameters to the parameterized query.
• An array of values is passed as a property to the ReadFromJDBC operator ("parameters").
Here is an example of a parameter query:
Query to gather parameters
select distinct l_shipmmode from lineitem
Note that the parameter query is selecting a distinct set of values from the lineitem table. The values will be substituted for the "?" in the parameterized query.
The parameters are handled the same whether they are provided directly as objects, read from the input port, or queried through the parameter query. For each set (row) of parameters, the following occurs:
• The parameters are substituted within the parameterized data query. From our example, one of the parameter values is "RAIL". When substituted within the example data query, the resultant query becomes select * from lineitem where l_shipmode = RAIL.
• The query with the substituted parameters is executed against the database.
• The results of the query are streamed to the output of the operator.
When used with a parameterized query and provided query parameters, the ReadFromJDBC operator operates in parallel by creating multiple workers. The query parameters are distributed to the workers in round-robin fashion. The workers execute the data query after applying parameter substitution as described above.
The order of the parameter value is important. The order must match the order of the parameters (the "?") in the data query. This is true for parameter values from the optional input port, provided as objects or from the parameter query. The ReadFromJDBC operator does not have the context to determine which parameter values match which parameters. The ordering of the parameter values is left to the user.
When using a parameterized query, the number of parameter values provided must match the number of parameters in the query. If there is a mismatch in sizes, an exception will be raised and the operator will fail.
To obtain the best performance, the number of sets of query parameters should be greater than the configured parallelism. In our example parameter query, only 7 values are returned. In this case, having parallelism set to anything greater than 7 will be wasteful. Additional streams of execution will have no data to process.
Writing to database tables can be accomplished with the
Using the WriteToJDBC Operator to Write to Databases operator.
Code Examples
The following example demonstrates using the
ReadFromJDBC operator to access data using the provided SQL statement. Setting the fetch size and the SQL warning limit properties is optional. Default settings will be used if they are not set explicitly.
Either a table name or a SQL statement to execute can be specified. Using a table name is equivalent to using the statement select * from tableName. In this example, a table name is specified.
Using the ReadFromJDBC Operator in Java
ReadFromJDBC reader = graph.add(new ReadFromJDBC());
reader.setDriverName("com.mysql.jdbc.Driver");
reader.setUrl("jdbc:mysql://dbserver:3306/test");
reader.setUser("test");
reader.setPassword("test");
reader.setSqlWarningLimit(20);
reader.setTableName("tpchorders");
Using the ReadFromJDBC Operator in RushScript
var data = dr.readFromJDBC({driverName:'com.mysql.jdbc.Driver', url:'jdbc:mysql://dbserver:3306/test', user:'test', password:'test', sqlWarningLimit:20, tableName:'tpchorders'});
The following example uses a SQL statement directly. Using the SQL statement allows selection of only the desired fields. A complex statement can also be used to join tables together and have the results presented as a single output data set.
Specifying a SQL statement
ReadFromJDBC reader = graph.add(new ReadFromJDBC());
reader.setDriverName("com.mysql.jdbc.Driver");
reader.setUrl("jdbc:mysql://dbserver:3306/test");
reader.setUser("test");
reader.setPassword("test");
reader.setDataQuery("select o_orderkey, o_orderdate, o_totalprice from totalorders");
This example demonstrates using a parameterized query in Java:
Using a parameterized query in Java
ReadFromJDBC reader = graph.add(new ReadFromJDBC());
reader.setDriverName("com.mysql.jdbc.Driver");
reader.setUrl("jdbc:mysql://dbserver:3306/test");
reader.setUser("test");
reader.setPassword("test");
reader.setSqlWarningLimit(20);
reader.setDataQuery("select * from lineitem where l_shipmode = ?");
reader.setParameterQuery("select distinct l_shipmode from lineitem");
This example demonstrates using a parameterized query in RushScript:
Using a parameterized query in RushScript
var data = dr.readFromJDBC({
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'test',
password:'test',
sqlWarningLimit:20,
dataQuery:'select * from lineitem where l_shipmode = ?',
parameterQuery:'select distinct l_shipmode from lineitem'});
The driver name and URL format are specific to each JDBC driver. See the documentation of the specific database being used for more information on these values.
Properties
The
ReadFromJDBC operator supports the following properties:
Ports
The
ReadFromJDBC operator supports one optional input port. This port is used to provide parameters to a parameterized data query.
The
ReadFromJDBC operator provides one output port:
Using the ReadDelimitedText Operator to Read Delimited Text
The
ReadDelimitedText operator reads a text file of delimited records as record tokens. Records are identified by the presence of a non-empty, user-defined record separator sequence between each individual record. Output records contain the same fields as the input text. The reader can also filter and/or reorder the fields of the output as necessary.
Delimited text supports up to three distinct user-defined sequences within a record, used to identify field boundaries:
• a field separator: found between individual fields. By default, this is the comma character (,).
• a field start delimiter: marking the beginning of a field value. By default, this is the double quote character (").
• a field end delimiter: marking the end of a field value. By default, this is the double quote character (").
The field separator cannot be empty. The start and end delimiters can be the same value. They can also both (but not individually) be empty, signifying the absence of field delimiters. It is not expected that all fields start and end with a delimiter, though if one starts with a delimiter it must end with one. Fields containing significant characters, such as whitespace and the record and field separators, must be delimited to avoid parsing errors. Should a delimited field need to contain the end delimiter, it is escaped from its normal interpretation by duplicating it. For instance, the value "ab""c" represents a delimited field value of ab"c.
The reader supports incomplete specification of the separators and delimiters. By default, it will attempt to automatically discover these values based on analysis of a sample of the file. We strongly suggest that this discovery ability not be relied upon if these values are already known, as it cannot be guaranteed to produce desirable results in all cases.
The reader requires a schema to provide parsing and type information for the fields. The schema, in conjunction with any specified field filter, defines the output type of the reader. This can be manually constructed through the API provided, although this metadata is often persisted externally. The
StructuredSchemaReader class provides support for reading in Pervasive Data Integrator structured schema descriptors (.schema files) for use with readers. Schemas can also be generated from
Record Token Types by using the
TextRecord.convert methods.
Because delimited text has explicit field markers, it is also possible to perform automated discovery of the schema based on the contents of the file; the reader provides a pluggable discovery mechanism to support this functionality. Custom mechanisms must implement the
TextRecordDiscoverer interface. Two implementations are provided in the operator library:
• A mechanism using pattern matching for determining field type. Values for a field are compared to the patterns; any patterns which do not match a field are discarded as possibilities. If multiple possibilities exist and the conflict is between numeric types (for example, integers and doubles), the wider of the two is chosen. Otherwise, conflicts are resolved by treating the field as a string. This is the default mechanism used by the operator.
The set of patterns used can be extended by providing additional patterns when setting the schemaDiscovery property. Alternatively, the
TextRecord.extendDefault method can be used to to create a new discoverer using the supplied patterns in addition to the defaults. If the default patterns should not be included, create a
PatternBasedDiscovery object directly, specifying only the desired patterns.
• A mechanism that treats all fields as “raw” strings—that is, without white space trimming and not treating the empty string as NULL. Use the
TextRecord.TEXT_FIELD_DISCOVER constant to reference this mechanism.
Both built-in schema discoverers will generate a schema having as many fields as the longest analyzed row. Both use the header row, if present, to name the schema’s fields. Repetitions of the same name will be resolved by adding a suffix to avoid collision; any missing names will be generated as field<n>, where <n> is the field’s index in the schema.
Typically, the output of the reader includes all records in the file, both those with and without parsing errors. Fields that cannot be parsed are null-valued in the resulting record. If desired, the reader can be configured to filter failed records from the output.
Delimited text data may or may not have a header row. The header row is delimited as usual but contains the names of the fields in the data portion of the record. The reader must be told whether a header row exists. If it does, the parser will skip the header row; otherwise the first row is treated as a record and will appear in the output. If a header row does exist and any of the field names are blank, a field name will be generated. Generated field names take the form “fieldN” where N is the zero-based position of the field.
Delimited text files can be parsed in parallel under “optimistic” assumptions: namely, that parse splits do not occur in the middle of a delimited field value and somewhere before an escaped record separator. This is assumed by default but can be disabled, with an accompanying reduction of scalability and performance.
When reading delimited text files there may be metadata information about the data embedded within the files. If the reader has been configured to use this metadata, it can obtain information about the ordering and partitioning of the data when it was written, which can eliminate the need to re-sort or partition the data.
Code Examples
The first code example shows a simple usage of the reader. The path to the local file name is given as a parameter to the constructor. This could have also been set using the setSource() method. The field separator and header properties are set. Then a record type is built and used as the input schema. Note that the record type must be converted to an acceptable schema before being used by the reader. Also note that the record separator is not specified. It will be determined by the auto discovery mechanism of the reader.
Using the ReadDelimitedText operator example in Java
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(INT("userID"), INT("movieID"), INT("rating"), STRING("timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
Using the ReadDelimitedText operator usage in RushScript
var ratingsSchema = dr.schema().INT('userID').INT('movieID').INT('rating').STRING('timestamp');
var data = dr.readDelimitedText({source:'data/ratings.txt', fieldSeparator:'::', header:true, schema:ratingsSchema});
The snippet of data below is from the ratings.txt file and can be read using the code example above.
userID::movieID::rating::timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
This next example reads from a file in a Hadoop Distributed File System (HDFS). The hdfs URL scheme identifies the file as being contained within an HDFS file system. The authority section of the URL specifies the specific HDFS file system. The rest of the path indicates the file path within the HDFS instance. A schema is built for this data since it contains a date field. The format or pattern of the date field must be specified since it is non-standard.
Reading from HDFS with a date type
TextRecord schema =
SchemaBuilder.define(
SchemaBuilder.STRING("accountNumber"),
SchemaBuilder.STRING("clientName"),
SchemaBuilder.STRING("companyName"),
SchemaBuilder.STRING("streetAddress"),
SchemaBuilder.STRING("city"),
SchemaBuilder.STRING("state"),
SchemaBuilder.STRING("zip"),
SchemaBuilder.STRING("emailAddress"),
SchemaBuilder.DATE("birthDate", "MM/dd/yyyy"), // specify pattern for parsing the date
SchemaBuilder.STRING("accountCodes"),
SchemaBuilder.DOUBLE("standardPayment"),
SchemaBuilder.DOUBLE("payment"),
SchemaBuilder.DOUBLE("balance")
);
// Create a delimited text reader for the accounts data
ReadDelimitedText reader = graph.add(new ReadDelimitedText("hdfs://saturn.englab.local:9000/user/jfalgout/data/Accounts.txt"));
reader.setFieldSeparator(",");
reader.setHeader(true);
reader.setSchema(schema);
Following is a snippet of the data that can be read and parsed with the previous code example. Note that each field is surrounded with a double quote as the field delimiter. Also note the format of the "birthDate" field. It is a non-standard (not ISO) format. The schema used to parse the data specifies the pattern used to parse the date field.
"accountNumber","clientName","companyName","streetAddress","city","state","zip","emailAddress","birthDate","accountCodes","standardPayment","payment","balance"
"01-000667","George P Schell","Market Place Products","334 Hilltop Dr","Mentor","OH","44060-1930","warmst864@aol.com","02/28/1971","XA","101.00","100.00","15.89"
"01-002423","Marc S Brittan","Madson & Huth Communication Co","5653 S Blackstone Avenue, #3E","Chicago","IL","60637-4596","mapper@tcent.net","06/30/1975","BA","144.00","144.00","449.92"
"01-006063","Stephanie A Jernigan","La Salle Clinic","77565 Lorain","Akron","OH","44325-4002","dram@akron.net","11/02/1941","EB|CB","126.00","126.00","262.98"
"01-010474","Ernie Esser","Town & Country Electric Inc.","56 Pricewater","Waltham","MA","2453","hazel@bentley.net","12/15/1962","JA|RB","127.00","127.00","271.75"
"01-010852","Robert A Jacoby","Saturn of Baton Rouge","4001 Lafayette","Baton Rouge","LA","70803-4918","din33@norl.com","12/22/1985","ED|EA|RB|KA","142.00","150.00","423.01"
"01-011625","James C Felli","Bemiss Corp.","23A Carolina Park Circle","Spartanburg","SC","29303-9398","cadair@gw.com","02/21/1940","SB","151.00","155.00","515.41"
In the previous example, the schema could also be discovered, extending the default type patterns to recognize the date formats. This can be done in a fairly straightforward fashion:
Custom schema discovery
// Instead of constructing schema and calling reader.setSchema(schema)
TextDataType usDate= TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy"));
List<TypePattern> patterns= Arrays.asList(new TypePattern("\\d{1,2}/\\d{1,2}/\\d+", usDate));
// Simple extension of default pattern-based discovery
reader.setSchemaDiscovery(patterns);
// Complete replacement of schema discoverer
// More interesting when using custom discovery implementation
TextRecordDiscoverer discoverer= TextRecord.extendDefault(patterns);
reader.setSchemaDiscovery(discoverer);
Properties
The
ReadDelimitedText operator supports the following properties:
Name | Type | Description |
analysisDepth | int | The number of characters to read for performing schema discovery and structural analysis. |
autoDiscoverNewline | String | Determines if the record separator should be auto-discovered. Defaul: enabled. |
charset | Charset | The character set used by the data source. Default: ISO-8859-1. |
charsetName | String | The character set used by the data source by name. |
decodeBuffer | int | The size of the buffer, in bytes, used to decode character data. By default, this will be automatically derived using the character set and read buffer size. |
discoveryNullIndicator | String | The text value used to represent null values by default in discovered schemas. By default, this is the empty string. |
discoveryStringHandling | StringConversion | The default behavior for processing string-valued types in discovered schemas. |
encoding | | Properties that control character set encoding. |
errorAction | CodingErrorAction | The error action determines how to handle errors encoding the input data into the configured character set. The default action is to replace the faulty data with a replacement character. |
extraFieldAction | | How to handle fields found when parsing the record, but not declared in the schema. |
fieldDelimiter | String | Delimiter used to denote the boundaries of a data field. |
fieldEndDelimiter | String | Ending delimiter used to denote the boundaries of a data field. |
fieldErrorAction | | How to handle fields that cannot be parsed. |
fieldLengthThreshold | int | The maximum length allowed for a field value before it is considered an error. |
fieldSeparator | String | Delimiter used to define the boundary between data fields. |
fieldStartDelimiter | String | Starting delimiter used to denote the boundaries of a data field. |
header | String | Whether to expect a header row in the source. The header row contains field names. |
includeSourceInfo | boolean | Determines whether output records will include additional fields detailing origin information for the record. If true, records will have three additional fields: • sourcePath – the path of the file from which the record originates. If this is not known, it will be NULL. • splitOffset – the offset of the starting byte of the containing split in the source data. • recordOffset – the offset of the first character of the record text from the start of the containing split. If these names would collide with those defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property. |
lineComment | String | The character sequence indicating a line comment. Lines beginning with this sequence are ignored. |
maxRowLength | int | The limit, in characters, for the first row. Zero indicates no maximum. |
missingFieldAction | | How to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output. |
parseErrorAction | | How to handle all parsing errors. |
parseOptions | | The parsing options used by the reader. |
pessimisticSplitting | boolean | Configures whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file (assumes the input files are not splittable). |
readBuffer | int | The size of the I/O buffer, in bytes, to use for reads. Default: 64K. |
readOnClient | boolean | Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context. |
recordSeparator | String | Value to use as a record separator. |
recordWarningThreshold | int | The maximum number of records which can have parse warnings before failing. |
replacement | String | Replacement string to use when encoding error policy is replacement. Default: '?' |
selectedFields | List<String> | The list of input fields to include in the output. Use this to limit the fields written to the output. |
schema | | The record schema expected in the delimited text source. This property is mutually exclusive with schemaDiscovery; setting one causes the other to be ignored. By default, this property is unset. |
schemaDiscovery | | The schema discovery mechanism to use. This property is mutually exclusive with schema; setting one causes the other to be ignored. By default, a pattern-based mechanism is used. Supplying a list of pattern/type pairs uses the default discoverer extended with the supplied patterns. |
source |
ByteSource, Path, or String | Source of the input data to parse as delimited text. |
splitOptions | | The configuration used in determining how to break the source into splits. |
useMetadata | boolean | Whether the reader should use any discovered metadata about the ordering and distribution. Default: false. |
Ports
The
ReadDelimitedText operator provides a single output port.
Using the ReadFixedText Operator to Read Fixed-width Text
Fixed text data contains fields that are not delimited as CSV files are. A schema defines each field and its type, offset, and length within a row of data. Data is parsed from each input row according to the defined position of each field. Field types can be specified along with patterns for parsing the data. Patterns are especially useful for date and timestamp field types.
The
ReadFixedText operator reads a text file of fixed-width records as record tokens. Records are identified by the presence of a non-empty, user-defined record separator sequence between each individual record or by the total length of the record if an empty or zero-length record separator is provided. Output records contain the same fields as the input file. The parser can also filter or reorder the fields of the output, as requested.
The reader requires a
FixedWidthTextRecord object to provide field position as well as parsing and type information for fields. The schema, in conjunction with any specified field filter, defines the output type of the parser. These can be manually constructed through the API provided, although this metadata is often persisted externally.
StructuredSchemaReader provides support for reading in Pervasive Data Integrator structured schema descriptors (.schema files) for use with readers.
Typically, the output of the parsing includes all records in the file, both those with and without parsing errors. Fields that cannot be parsed are null-valued in the resulting record. If desired, the reader can be configured to filter failed records from the output.
Since record boundaries occur at known positions, fixed text files can be parsed in parallel.
Code Examples
The following example builds a schema that is used by the
ReadFixedText operator to read a fixed text format file.
Using the ReadFixedText operator in Java
// Create fixed text reader
ReadFixedText reader = graph.add(new ReadFixedText("data/AccountsFixed.txt"));
// Build the schema. Fields must be added in order of appearance in records.
// The field size must be exact as it determines the position of the field for parsing.
FixedWidthTextRecord schema = new FixedWidthTextRecord(new TextConversionDefaults(StringConversion.NULLABLE_TRIMMED));
schema.defineField("accountNumber", new PaddedTextType(TextTypes.STRING, 9, ' ', Alignment.LEFT));
schema.defineField("name", new PaddedTextType(TextTypes.STRING, 21, ' ', Alignment.LEFT));
schema.defineField("companyName", new PaddedTextType(TextTypes.STRING, 31, ' ', Alignment.LEFT));
schema.defineField("address", new PaddedTextType(TextTypes.STRING, 35, ' ', Alignment.LEFT));
schema.defineField("city", new PaddedTextType(TextTypes.STRING, 16, ' ', Alignment.LEFT));
schema.defineField("state", new PaddedTextType(TextTypes.STRING, 2, ' ', Alignment.LEFT));
schema.defineField("zip", new PaddedTextType(TextTypes.STRING, 10, ' ', Alignment.LEFT));
schema.defineField("emailAddress", new PaddedTextType(TextTypes.STRING, 25, ' ', Alignment.LEFT));
schema.defineField("birthDate", new PaddedTextType(TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy")), 10, ' ', Alignment.LEFT));
schema.defineField("accountCodes", new PaddedTextType(TextTypes.STRING, 11, ' ', Alignment.LEFT));
schema.defineField("standardPayment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
schema.defineField("payment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 7, ' ', Alignment.LEFT));
schema.defineField("balance", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
// Set the schema of the reader.
reader.setSchema(schema);
An example of data that can be read with the above code fragment follows. Because of the wide nature of the data, the records will most likely appear across multiple lines of the display.
01-000667George P Schell Market Place Products 334 Hilltop Dr Mentor OH44060-1930warmst864@aol.com 02/28/1971XA 101.00100.00 15.89
01-002423Marc S Brittan Madson & Huth Communication Co 5653 S Blackstone Avenue, #3E Chicago IL60637-4596mapper@tcent.net 06/30/1975BA 144.00144.00 449.92
01-006063Stephanie A Jernigan La Salle Clinic 77565 Lorain Akron OH44325-4002dram@akron.net 11/02/1941EB|CB 126.00126.00 262.98
01-010474Ernie Esser Town & Country Electric Inc. 56 Pricewater Waltham MA2453 hazel@bentley.net 12/15/1962JA|RB 127.00127.00 271.75
01-010852Robert A Jacoby Saturn of Baton Rouge 4001 Lafayette Baton Rouge LA70803-4918din33@norl.com 12/22/1985ED|EA|RB|KA142.00150.00 423.01
01-011625James C Felli Bemiss Corp. 23A Carolina Park Circle Spartanburg SC29303-9398cadair@gw.com 02/21/1940SB 151.00155.00 515.41
01-018448Alan W Neebe Georgia State Credit Union PO Box 159 Demorest GA30535-1177delores@truett.com 01/31/1960MA|ED|SB 113.00120.00 131.89
01-018595Alexander Gose Office Support Services 436 Green Mountain Circle New Paltz NY12561-0023dams@matrix.net 06/19/1940EC 147.00147.00 477.09
The following example demonstrates using the ReadFixedText operator in RushScript. The schema is created in RushScript and passed to the operator.
Using the ReadFixedText operator in RushScript
// Build the schema. Fields must be added in order of appearance in records.
// The field size must be exact as it determines the position of the field for parsing.
var accountsFixedSchema = dr.schema({type:'FIXED'})
.nullable(true)
.trimmed(true)
.padChar(' ')
.alignment('LEFT')
.STRING("accountNumber", {size:9})
.STRING("clientName", {size:21})
.STRING("companyName", {size:31})
.STRING("streetAddress", {size:35})
.STRING("city", {size:16})
.STRING("state", {size:2})
.STRING("zip", {size:10})
.STRING("emailAddress", {size:25})
.DATE("birthDate", {pattern:'MM/dd/yyyy', size:10})
.STRING("accountCodes", {size:11})
.DOUBLE("standardPayment", {pattern:'0.00', size:6})
.DOUBLE("payment", {pattern:'0.00', size:7})
.DOUBLE("balance", {pattern:'0.00', size:6});
// Read the data
var data = dr.readFixedText({source:'/path/to/file.txt', schema:accountsFixedSchema});
Properties
The
ReadFixedText operator supports the following properties:
Ports
The
ReadFixedText operator provides a single output port.
Using the ReadSource Operator to Read Generic Sources
The
ReadSource operator reads a defined data source as a stream of records. The data source provides a sequence of bytes in some format that can be parsed into records that are assumed to be identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor, which must be provided.
This operator is low level, providing a generalized model for reading files in a distributed fashion. Typically, the
ReadSource operator is not directly used in a graph, instead being indirectly used though a composite operator such as one derived from
AbstractReader, providing a more appropriate interface to the end user.
Parallelized reads are implemented by breaking input files into independently parsed pieces, a process called splitting. Splits are then distributed to available partitions and parsed. When run on a distributed cluster, the reader makes an attempt to assign splits to machines where the I/O will be local, but non-local assignment may occur in order to provide work for all partitions. Distributed execution also makes an assumption that the specified data source is accessible from any machine. If this is not the case, the read operator must be made non-parallel by invoking the disableParallelism() method on the operator instance.
Not all formats support splitting; this generally requires a way of unambiguously identifying record boundaries. Formats will indicate whether they can support splitting. If not, each input file will be treated as a single split. Even with a non-splittable format, this means reading multiple files can be parallelized. Some formats can partially support splitting, but in a “optimistic” fashion; under most circumstances splits can be handled, but in some edge cases splitting leads to parse failures. For these cases, the reader supports a “pessimistic” mode that can be used to assume a format is non-splittable, regardless of what it reports.
The reader makes a best-effort attempt to validate the data source before execution but cannot always guarantee correctness, depending on the nature of the data source. This is done to try to prevent misconfigured graphs from executing, such as when the reader may not execute until a late phase where a failure may result in a significant amount of work being lost.
Tip: This is a low-level operator that typically is not directly used. It can be used with a custom data format. A custom data format may be needed to support a format not provided by the DataFlow library.
Code Example
This example code fragment demonstrates how to set up a reader for a generic file type.
Using the ReadSource operator
ReadSource reader = new ReadSource();
reader.setSource(new BasicByteSource("filesource"));
reader.setFormat(new DelimitedTextFormat(TextRecord.convert(record(INT("intfield"), STRING("stringfield"))),
new FieldDelimiterSettings,
new CharsetEncoding()));
ParsingOptions options = new ParsingOptions();
options.setSelectedFields("stringfield");
reader.setParseOptions(options);
Properties
The
ReadSource operator supports the following properties:
Ports
The
ReadSource operator provides a single output port:
Using the ReadLog Operator to Read Log Data
Many applications and systems produce log data that is loosely structured. Generally, there is a specific format used to write the log data; however, this format is not always unambiguously reversible to a typical parser. Also, different fields might use different field separators and delimiters.
About the only generalities that can be made about all log formats is that the records always contain an ID field, usually a timestamp, and a message field consisting of the information that produced the log event. In these cases the log data may not be able to be read by a regular delimited or fixed text reader.
The
ReadLog operator reads a text file or alternative source consisting of log events from a particular application or system. The type of application or system producing the log records must be specified in advance through a property setting. The currently supported log types are enumerated by
SupportedLogType. Configuring the operator requires the user to either provide one of these enumerations or their own implementation of a particular
LogFormat. It should be noted that these settings are mutually exclusive.
In addition to specifying the log type the format pattern may be set. This is a String that provides information about a log format when customization of the format is allowed. It is specific to the type of log being read and therefore may provide more customization based on the logs being read. Additionally the newline character used by the log files may be specified if a nondefault newline character is being used. By default this is determined automatically by examining the first few lines in the source.
The record flow generated by this operator is determined by the log type being read and the log format pattern provided during composition of the operator unless otherwise noted.
Supported Log Types
The
ReadLog operator supports a selection of common log formats. These are enumerated by
SupportedLogType. Custom log formats can be added by implementing the
LogFormat interface. A custom format would be instantiated and provided to the
ReadLog operator through the logFormat property. Certain log types can also be manually instantiated and provided to the
ReadLog operator when log-specific settings need to be changed, such as log4j’s logging levels.
The various supported log types are listed below.
Generic Log Data
The generic type can be used when the log data can be parsed using a regular expression but there is no dedicated format for the log. The schema is automatically generated by counting the number of groupings in the regular expression provided. The schema can also be set manually by creating a custom instance of the log format.
The generic format takes a valid Java regular expression string. The grouping of the regular expression defines the fields the individual records will be split into.
Default : "(.*)"
Example : "(\\d\\d.\\d+) (\\w+) (\\w+)"
Common Log Format
The CLF type can be used when reading a web server log in common log format. NCSA common log format is specified at
http://www.w3.org/Daemon/User/Config/Logging.html#common-logfile-format.Since CLF is well defined, it does not allow a format pattern to be specified.
Combined Log Format
The Combined type can be used when reading a web server log in combined log format. NCSA combined log format is specified at
http://publib.boulder.ibm.com/tividd/td/ITWSA/ITWSA_info45/en_US/HTML/guide/c-logs.html#ncsa.
Combined takes a true or false string, which determines if the optional cookie field is included in the log.
Extended Log Format
The ELF type can be used when reading a web server log in extended log format. Extended log format is specified at
http://www.w3.org/TR/WD-logfile.html.
ELF will accept a string in the same form as a Fields directive as specified in the official format. If format discovery is enabled, it will scan the file for any directives and apply them appropriately.
Example : "#Fields: date time cs-method cs-uri"
GlassFish Logs
The GlassFish format supports reading logs produced by GlassFish servers. The GlassFish server log format is specified at
http://docs.oracle.com/cd/E18930_01/html/821-2416/abluk.html.
The format pattern supported by the GlassFish format consists of a string that specifies the date format used in the timestamps of the log. Any string supported by Java’s DateFormat class is acceptable.
Default : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
Example : "dd-MM-yyyy HH:mm:ss"
Log4j Logs
The log4j format supports reading logs produced by the Apache log4j library for Java. More information about the library can be found at
http://logging.apache.org/log4j/1.2/.
The log4j format will accept a string in the same form as the conversion pattern that specifies the logging. More information about log4j conversion patterns can be found at
http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/EnhancedPatternLayout.html.
Default : "%r [%t] %-5p %c %x - %m%n"
Example : "%d{ISO8601} %p %c: %m%n"
Syslog Logs
The syslog format supports reading logs produced by syslogd and other BSD-compliant syslog producers. The BSD syslog format is specified by
RFC-3164.
The format pattern supported by the syslog format consists of a string that includes the current four-digit year and the signed four-digit offset from UTC separated with a single space.
Default : current year and timezone
Example : "2012 -0600"
Code Examples
This example code fragment demonstrates how to set up a reader for a log4j log file.
Using the ReadLog operator
ReadLog reader = graph.add(new ReadLog(data/log4jdata.log));
reader.setLogType(SupportedLogType.LOG4J);
reader.setLogPattern("%d{ISO8601} %p %c: %m%n");
reader.setNewline("\n");
Using the ReadLog operator in RushScript
var data = dr.readLog({source:'data/log4jdata.log', logType:'LOG4J', logPattern:'%d{ISO8601} %p %c: %m%n', newLine:'\n');
Properties
The
ReadLog operator supports the following properties:
Ports
The
ReadLog operator provides a single output port.
Using the ReadARFF Operator to Read Sparse Data
This topic describes the DataFlow ARFF read operator. For information on its KNIME node, see
ARFF Reader.
Sparse data is useful for data sets that contain a large number of fields where most of the fields do not have data values. This is mostly the case with numeric data, but can also be applied to enumerated data types. A common example is a data set that contains a row-per-website user with a field-per-website page. Each field contains a count of the number of times a user has visited the specific page. Most users will visit only a fraction of the overall pages on the website. Using a sparse data representation will allow the data set to be much smaller in size than a fully populated, dense data set.
DataFlow supports sparse data using the Attribute-Relation File Format (ARFF). The
ReadARFF operator is used to read sparse data stored in ARFF. Files using ARFF can be in either sparse or dense mode. This reader detects the mode and reads the data accordingly. ARFF files contain schema information. The schema is parsed and used to determine how to parse data records.
ARFF can be parsed in parallel under “optimistic” assumptions: namely, that parse splits do not occur in the middle of a delimited field value and somewhere before an escaped record separator. This is assumed by default, but can be disabled with an accompanying reduction of scalability and performance.
ARFF data is used by DataFlow to represent sparse data. But it can also be used to store dense data in a CSV style. The ARFF mode determines which format to use: sparse or dense. The reader automatically discovers the mode.
The ARFF metadata also contains two other data values: the relation name and comments. The relation name is specified as one of the metadata headers. Comments are lines that start with the "%" character. Comments are returned as a list of String values.
Code Examples
Since ARFF includes metadata that contains field names and types, the schema for ARFF files does not have to be specified. The metadata can be accessed using the discoverMetadata() method on the reader after the data source has been configured. The metadata can be used to access the relation name, comments, ARFF mode, and data schema. The schema contains the field names and types along with patterns for parsing and formatting field values.
Using the ReadARFF operator in Java
// Create ARFF reader
ReadARFF reader = graph.add(new ReadARFF("data/weather.arff"));
// Get metadata for the configured data source
Analysis metadata = reader.discoverMetadata(FileClient.basicClient());
ARFFMode mode = metadata.getMode();
String relationName = metadata.getRelationName();
List<String> comments = metadata.getComments();
TextRecord schema = metadata.getSchema();
// Dump out metadata values
System.out.println("mode = " + mode);
System.out.println("relationName = " + relationName);
System.out.println("comments = " + comments);
System.out.println("schema = " + schema.getFieldNames());
Following is a snippet of output from running an application with the previous code fragment.
mode = DENSE
relationName = weather
comments = []
schema = [outlook, temperature, humidity, windy, play]
Using the ReadARFF operator in RushScript
var data = dr.readARFF(source:'data/weather.arff');
The weather.arff file’s contents:
@relation weather
@attribute outlook {sunny, overcast, rainy}
@attribute temperature real
@attribute humidity real
@attribute windy {TRUE, FALSE}
@attribute play {yes, no}
@data
sunny,85,85,FALSE,no
sunny,80,90,TRUE,no
overcast,83,86,FALSE,yes
rainy,70,96,FALSE,yes
rainy,68,80,FALSE,yes
rainy,65,70,TRUE,no
overcast,64,65,TRUE,yes
sunny,72,95,FALSE,no
sunny,69,70,FALSE,yes
rainy,75,80,FALSE,yes
sunny,75,70,TRUE,yes
overcast,72,90,TRUE,yes
overcast,81,75,FALSE,yes
rainy,71,91,TRUE,no
Properties
The
ReadARFF operator supports the following properties:
Ports
The
ReadARFF operator provides one output port:
Using the ReadStagingDataset Operator to Read Staging Datasets
Staging data sets are used within DataFlow for writing intermediate data into a fast and efficient binary format. They are also very convenient since metadata is stored in a header section of the data set. When reading a data set, the metadata is used to determine the fields and types of data contained within the data set.
Text files can be used for intermediate data access also but incur the overhead of formatting and parsing. Text files also can introduce data errors due to having to convert numeric data into textual formats. Staging data sets do not require the round trip to and from text and so bypass these issues.
The DataFlow framework uses staging data sets in the following situations:
• In between application phases where one phase feeds data to the next
• Staging data to disk during merge sort operations
• Staging data to disk due to queue overflow in deadlock situations
There are several use cases for directly using staging data sets. A few examples are:
• A particular task that needs to be accomplished must be broken into several DataFlow applications. Data from each application must be passed to the next. Staging data sets can be used to capture the intermediate data, allowing for quick and easy access.
• Data within a database is being used to build a predictive model. This development will require many iterations of algorithm testing with the data. For this use case, the most efficient route is to first read the data from the source database and write it to any staging data sets. Then use the staged data as input into the predictive analytic development. The one-time read from the database is overhead. However, given that the source data will be read many times, the savings are substantial.
As with other file-based operators, the
ReadStagingDataset operator accepts several data sources:
• A Path reference
• A String reference containing:
– The path to a single data set
– A path to a directory containing many data sets with the same format and schema
– A path with wildcard characters within the file name that will be resolved to the matching data set files
Code Examples
This code example constructs a new
ReadStagingDataset operator by passing in the name of the file to read as a constructor parameter. Properties for setting the I/O buffer size and the selected fields are then set. The selected fields must contain field names that exist in the data set. Use the selectedFields property to constrain the fields read from the data set and provided to the output port of the operator.
Using the ReadStagingDataset operator in Java
ReadStagingDataset reader = graph.add(new ReadStagingDataset("results/ratings-stage"));
reader.setReadBuffer(128 * 1024);
reader.setSelectedFields(Arrays.asList(new String[] {"userID", "rating"}));
Using the ReadStagingDataset operator in RushScript
var data = dr.readStagingDataset({source:'results/ratings-stage', readBuffer:131072});
The operator also supports a static method for reading the metadata of a staging data set. The following code example depicts how to read and use the metadata.
Using staging dataset metadata
ReadStagingDataset reader = graph.add(new ReadStagingDataset("results/ratings-stage"));
reader.setReadBuffer(128 * 1024);
reader.setSelectedFields(Arrays.asList(new String[] {"userID", "rating"}));
DatasetMetadata metadata = reader.discoverMetadata();
long rowCount = metadata.getRowCount();
int blockSize = metadata.getBlockSize();
TokenType datasetType = metadata.getSchema();
DatasetStorageFormat format = metadata.getStorageFormat();
System.out.println("rowCount = " + rowCount);
System.out.println("blockSize = " + blockSize);
System.out.println("datasetType = " + datasetType);
System.out.println("format = " + format);
The output of the metadata information from the previous application is shown following. This metadata can be used within an application as needed.
rowCount = 1000
blockSize = 64
datasetType = {"type":"record","representation":"DENSE_BASE_NULL","fields":[{"userID":{"type":"int"}},{"movieID":{"type":"int"}},{"rating":{"type":"int"}},{"timestamp":{"type":"string"}}]}
format = COMPACT_ROW
Properties
The
ReadStagingDataset operator supports the following properties:
Ports
The
ReadStagingDataset operator provides a single output port:
Using the ParseTextFields Operator to Parse Text Records
The
ParseTextFields operator is similar to the
ReadDelimitedText and other text-based read operators. However, instead of reading from a source of text, it takes a flow of records consisting of text fields as input. These input strings are then parsed into other value types as specified by the schema provided to the operator. Additionally, it emits a flow of records which failed parsing, allowing remediation to be performed on invalid records.
The parsed output will have the type specified by the schema. Output fields will contain the result of processing the input field of the same name according to the type information in the provided schema. Referenced input fields must either be string valued, in which case they are parsed according to the schema, or of a type which is assignable to the output field's type, in which case they are copied as-is.
If a field is present in the schema, but not in the input, the output field is NULL. If an input value is NULL, the resulting output field is NULL. Input fields without a matching field in the schema are dropped.
The rejected output has the same type as the input. Field values will match those of the failed input record.
ParseTextFields only performs semantic validation of the input; the input must have been already broken into field values. This can be accomplished by using
ReadDelimitedText with a schema consisting of all string fields or by custom operators. It should also be noted that a special schema discoverer
TextRecord.TEXT_FIELD_DISCOVER is provided for when the schema of the source field may not be known.
Code Examples
Following is an example usage of the parser. In this case, notice that the input field acctNumber is not defined in the schema, so is absent from the outputs. Conversely, acctCodes is present in the schema, but not in the input; it is present in the output, but will be NULL for every record. Additionally note that the input fields that are in the schema appear in a different order.
Using ParseTextFields in Java
// Define a "raw" schema, identifying fields but not parsing content
RecordTokenType rawType=record(STRING("acctNumber"),STRING("company"),
STRING("startDate"),STRING("balance"));
TextRecord rawSchema= TextRecord.convert(rawType, StringConversion.RAW);
// Read "raw" fields
ReadDelimitedText reader= graph.add(new ReadDelimitedText(...));
reader.setSchema(rawSchema);
// Define the parsing schema
TextRecord schema =
SchemaBuilder.define(
SchemaBuilder.STRING("company"),
SchemaBuilder.STRING("acctCode"),
SchemaBuilder.DOUBLE("balance"),
SchemaBuilder.DATE("startDate", "MM/dd/yyyy") // specify pattern for parsing the date
);
// Parse the text records
ParseTextFields parser= graph.add(new ParseTextFields());
parser.setSchema(schema);
graph.connect(reader.getOutput(), parser.getInput());
Using ParseTextFields in RushScript
var rawschema= dr.schema()
.nullable(false).trimmed(false) // Want strings in raw form
.STRING("acctNumber")
.STRING("company")
.STRING("startDate")
.STRING("balance");
var rawdata= dr.readDelimitedText({source:..., schema:rawschema});
var schema = dr.schema()
.STRING('company')
.STRING('acctCodes')
.DOUBLE('balance')
.DATE('startDate','MM/dd/yyyy');
var parsed= dr.parseTextFields(rawdata, {schema:movieschema});
To better illustrate the behavior of the previous code, consider the following source data (all fields are string values).
This yields the data below on the output port. Here values are parsed as the appropriate type for the field. As the balance field is NULL in the source, it is NULL in the output.
Meanwhile, the rejects port will produce the following data, as the startDate field has the wrong format (all fields are string values).
Properties
The
ParseTextFields operator supports the following properties.
Ports
The
ParseTextFields operator provides two output ports.
Using the ReadJSON Operator to Read JSON Text
This topic describes the DataFlow JSON read operator. For information about its KNIME node, see
JSON Reader.
The ReadJSON operator reads a JSON file containing key-value pairs or array of objects as record tokens. It supports JSON Lines format as described at
http://jsonlines.org/. The formatted text in JSON Lines contains a single JSON record per line. Each record is separated by a newline character.
In JSON, all field keys must start and end with a delimiter. A "(double quote) is typically used as the field delimiter. However, you may enable the allowSingleQuotes property to avoid parsing errors when single quotes are used. The ReadJSON operator uses the Jackson JSON parsing library to parse fields.
The reader may optionally specify RecordTextSchema to provide parsing and type information for fields. The schema, in conjunction with any specified field filter, defines the output type of the reader. This can be manually constructed using the provided DataFlow API.
The
StructuredSchemaReader class provides support for reading the DataConnect structured schema descriptors (.schema files) to use with readers. Also, automated schema discovery can be performed based on the contents of the file because JSON text has explicit field markers. The reader provides a pluggable discovery mechanism to support this function. By default, the schema is automatically discovered with the initial assumption that all the fields are strings. The discovered fields are named using the available key fields.
Normally, the reader output file includes all the parsed records with and without parsing errors. The fields that cannot be parsed have null values in the output. If required, the reader can be configured to filter the failed records from the output.
JSON text does not contain a header row since the keys in a JSON record define the fields in the output. JSON text files can be parsed in parallel with the "optimistic" assumption that the data is well formatted as per the JSON Lines standard.
Code Examples
The first code example shows a simple usage of the reader. The path to the local file name is provided as a parameter to the constructor. This can also be set using the setSource() method. A record type is built and used as the input schema. The record type must be converted to an acceptable schema before it is used by the reader.
Using the ReadJSON operator in Java
The following code shows how to use the ReadJSON operator in Java:
ReadJSON reader = graph.add(new ReadJSON("data/iris.jsonl"));
RecordTokenType schema = record(DOUBLE("sepallength"), DOUBLE("sepalwidth"), DOUBLE("petallength"), DOUBLE("petalwidth"), STRING("class"));
reader.setSchema(TextRecord.convert(schema));
Using ReadJSON operator in RushScript
The following code shows how to use the ReadJSON operator in RushScript:
var irisSchema = dr.schema().DOUBLE("sepallength"), DOUBLE("sepalwidth"), DOUBLE("petallength"), DOUBLE("petalwidth"), STRING("class");
var data = dr.readjson({source:'data/iris.jsonl ', schema: irisSchema});
The following data snippet is from the iris.jsonl file and can be read using the preceding code examples (either Java or RushScript).
{"sepallength":5.1,"sepalwidth":3.5,"petallength":1.4,"petalwidth":0.2,"class":"Iris-setosa"}
{"sepallength":4.9,"sepalwidth":3,"petallength":1.4,"petalwidth":0.2,"class":"Iris-setosa"}
{"sepallength":4.7,"sepalwidth":3.2,"petallength":1.3,"petalwidth":0.2,"class":"Iris-setosa"}
{"sepallength":7,"sepalwidth":3.2,"petallength":4.7,"petalwidth":1.4,"class":"Iris-versicolor"}
{"sepallength":6.4,"sepalwidth":3.2,"petallength":4.5,"petalwidth":1.5,"class":"Iris-versicolor"}
{"sepallength":6.9,"sepalwidth":3.1,"petallength":4.9,"petalwidth":1.5,"class":"Iris-versicolor"}
{"sepallength":6.3,"sepalwidth":3.3,"petallength":6,"petalwidth":2.5,"class":"Iris-virginica"}
{"sepallength":5.8,"sepalwidth":2.7,"petallength":5.1,"petalwidth":1.9,"class":"Iris-virginica"}
{"sepallength":7.1,"sepalwidth":3,"petallength":5.9,"petalwidth":2.1,"class":"Iris-virginica"}
{"sepallength":6.3,"sepalwidth":2.9,"petallength":5.6,"petalwidth":1.8,"class":"Iris-virginica"}
Properties
The
ReadJSON operator supports the following properties.
Name | Type | Description |
allowComments | Boolean | Determines whether the parser will allow using Java or C++ style comments (both '/'+'*' and '//' types) within parsed content or not. Default is False. |
allowUnquotedFieldNames | Boolean | Determines whether the parser will allow using unquoted field names. Default is False. |
allowSingleQuotes | Boolean | Determines whether the parser will allow using single quotes (apostrophe, character '\'') for quoting strings. Default is False. |
allowUnquotedControlChars | Boolean | Determines whether the parser will allow JSON strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed). Default is False. |
allowBackslashEscapingAny | Boolean | Determines whether the parser will allow quoting any character using backslash quoting mechanism. If it is not enabled, only characters that are explicitly listed by JSON specification can be escaped. Default is False. |
allowNumericLeadingZeros | Boolean | Determines whether the parser will allow numbers to start with additional leading zeros. If the leading zeros are allowed for numbers in source, then this field must be set to True. Default is False. |
allowNonNumericNumbers | Boolean | Determines whether the parser is allowed to recognize "Not a Number" (NaN) token as legal floating point values. Default is False. |
analysisDepth | Int | Indicates the number of characters to read for performing schema discovery and structural analysis. Default is False. |
charset | Charset | Indicates the character set used by the data source. Default is UTF-8. |
charsetName | String | Indicates the character set used by the data source based on the name. |
decodeBuffer | Int | Indicates the buffer size (in bytes) used to decode character data. By default, this is automatically derived using the character set and read buffer size. |
discoveryNullIndicator | Int | Indicates the text value used to represent null values by default in discovered schemas. By default, this is the empty string. |
discoveryStringHandling | StringConversion | Indicates the default behavior for processing string-valued types in discovered schemas. |
encoding | | Indicates the properties that control character set encoding. |
errorAction | CodingErrorAction | Determines the action to be performed for errors encoding the input data into the configured character set. The default action replaces the faulty data with a replacement character. |
extraFieldAction | | Determines the action to be performed for fields that are found when parsing the record, but not declared in the schema. |
fieldErrorAction | | Determines the action to be performed for fields that cannot be parsed. |
fieldLengthThreshold | Int | Indicates the maximum length allowed for a field value before it is considered an error. |
includeSourceInfo | Boolean | Determines whether output records will include additional fields that provides origin information for the record. If true, records will have three additional fields: • sourcePath - Path of the file from which the record originates. If this is not known, the value is Null. • SplitOffset - Offset of the starting byte of the containing split in the source data. • recordOffset - Offset of the first character of the record text from the start of the containing split. If these names collide with the names defined in the source schema, they will be renamed to avoid collision. These fields are added as the first three of the output and are not affected by the selectedFields property. |
missingFieldAction | | Indicates how to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, then the missing fields will be null-valued in the output. |
multilineFormat | | Determines whether the file should be parsed as a multiline JSON file, which allows each JSON record to span multiple lines. Otherwise the data must be in JSON lines format. Default is False. |
parseErrorAction | | Indicates the action to be performed for all parsing errors. |
parseOptions | | Indicates the parsing options used by the reader. |
pessimisticSplitting | Boolean | Configures whether pessimistic file splitting must be used. By default, this is disabled. Pessimistic splitting defines one file split per file (assumes the input files cannot be split). |
readBuffer | Int | Indicates the size of the I/O buffer (in bytes) to use for reads. Default is 64K. |
readOnClient | Boolean | Determines whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster if executed in a distributed context. |
recordWarningThreshold | Int | Indicates the maximum number of records that can have parse warnings before it fails. |
replacement | String | Indicates the replacement string to be used when encoding error policy is replacement. Default is '?'. |
selectedFields | List<String> | Indicates the list of input fields to be included in the output. This sets a limit for the fields written to the output. |
schema | | Indicates the record schema expected in the delimited text source. This property is mutually exclusive with the schemaDiscovery property. If either of the two property is set, then the other property is ignored. By default, this property is not set. |
schemaDiscovery | | Indicates the schema discovery mechanism to be used. This property is mutually exclusive with the schema property. If either of the two property is set, then the other property is ignored. By default, a pattern-based mechanism is used. Providing a list of pattern/type pairs uses the default discoverer extended with the supplied patterns. |
source | | Indicates the source of the input data to be parsed as delimited text. |
splitOptions | | Indicates the configuration used to determine how to break the source into splits. |
Ports
The
ReadJSON operator provides a single output port.
Write Operators
Using the WriteAvro Operator to Write Apache Avro Data
The
WriteAvro operator writes a stream of records in
Apache Avro format. Avro is a commonly used binary format offering data compression and the ability to be parsed in parallel. The writer must provide an encoding schema which is serialized with the data, providing the reader with sufficient metadata to decode it.
The operator can be used to write data with a predefined schema or, if one is not provided, generate a schema automatically. When appending to an existing file, the schema encoded in the target is used, ignoring any that were set on the operator itself.
Input fields are mapped to fields in the output schema by name. Any input fields not mapped to an output field will be omitted from the output. Any output fields not mapped to an input field will be null-valued; if this is the case and the field schema does not allow null values—that is, it is not a UNION involving the NULL schema type, the writer will raise an error.
Input mappings must be to a compatible Avro schema type. An error will be raised if a field has an incompatible type. Furthermore, if a field schema does not allow null values, but the source field takes on a null value during execution—the writer will fail with an error. Valid mappings are described in the following table:
The remaining Avro schema types are handled as follows:
• RECORD can only be used as the top-level schema provided to the writer; nested records are not allowed. However, special record types representing the DataFlow DATE, TIME, and TIMESTAMP times are used in mapping generated schemas.
• UNION can be used only if one of the component schemas has a valid mapping.
• ARRAY and MAP are unsupported.
When no schema is provided, either from the target or on the operator, one will be generated based on the input data. The resulting schema will have the same field names and ordering as the input record.
Because DataFlow allows a greater range of field identifiers than Avro, a cleansing process must be applied to field names when generating a schema so that the resulting fields are valid. Names are cleansed by replacing any characters which are not alphanumeric or underscore ('_') with the underscore character. If the resulting name does not start with an underscore or alphabetic character, an underscore is prepended to the name. For example, a field named "field#1" is mapped to "field_1", a field named "#1 field" to "_1_field", and a field named "1st field" to "_1st_field". Should the cleansing process yield the same name for two different fields, schema generation will procude an error.
As all DataFlow values are nullable, the generated schema for each field is always a UNION involving NULL plus one other type as indicated by the source field type:
• DOUBLE, FLOAT, LONG, and INT fields are mapped to the primitive Avro types of the same name.
• STRING fields are mapped differently depending on whether they have a FieldDomain associated with them. If they have no domain, they are mapped to the STRING primitive type. If they have a domain, they are mapped to an Avro ENUM type with symbols matching the valid domain.
• BINARY fields are mapped to the BYTES primitive type.
• NUMERIC fields are mapped to the DOUBLE primitive type, with possible loss of precision.
• CHAR fields are mapped to the STRING primitive type.
• DATE fields are mapped to a nested RECORD type consisting of a single LONG field named epochDays.
• TIME fields are mapped to a nested RECORD type consisting of a single INT field named dayMillis.
• TIMESTAMP fields are mapped to a nested RECORD type consisting of three fields: a LONG field named epochSecs, an INT field named subsecNanos, and another INT field named offsetSecs.
As with all file-based write operators, the
WriteAvro operator will treat its sink as a directory by default. A file-per-application data stream will be output in this case. This can be overridden using the writeSingleSink property.
When writing Avro data, the metadata containing information about the ordering and partitioning of the data may optionally be saved with the data. This can provide a performance improvement when reading the data later since the data may not need to be re-sorted or partitioned again to ensure it meets various requirements of the graph.
Code Examples
The following code example demonstrates using the
WriteAvro operator to write a set of data files, generating a default schema.
Using the WriteAvro operator in Java
// Construct the writer with the path where the results are to be written.
// Use OVERWRITE mode indicating the output file should be overwritten if it exists already.
WriteAvro writer = graph.add(
new WriteAvro("results/test-ratings", WriteMode.OVERWRITE));
// Connect the output of the reader to the input of the writer
graph.connect(operator.getOutput(), writer.getInput());
The following example demonstrates using the operator in RushScript, providing the schema as a file (ratings.avrs) containing the JSON serialization of an Avro Schema object.
Using the WriteAvro operator in RushScript
// Note that this operator doesn't return anything
dr.writeAvro(data, {target:'results/test-ratings', mode:'OVERWRITE', schema:'ratings.avrs'});
Properties
The
WriteAvro operator supports the following properties:
Ports
The
WriteAvro operator provides a single input port.
Using the WriteToJDBC Operator to Write to Databases
In its simplest form the
WriteToJDBC operator writes records from an input port to a target table in a database using a JDBC driver. Data from the input data source will be added to the target table using database insert statements. Constraints may be applied by the database.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a jar file that contains the JDBC driver and its dependencies.
Note that the WriteToJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
• Within Java, create an
OperatorSettings instance with the maximum parallelism level set to the wanted level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added. See the code example below.
• Within RushScript, set the maxParallelism level property within the property settings of the operator instance. See the code example below.
Code Examples
The following example reads data from a local file and writes the data into a table in MySQL.
Using the WriteToJDBC Operator in Java to append data to a table
// Create an operator settings specifying the maximum number of parallel instances.
// This is optional. It limits the number of database connections made.
OperatorSettings settings = OperatorSettings.MAX_PARALLELISM(8);
// Create the JDBC writer
WriteToJDBC writer = graph.add(new WriteToJDBC(), settings);
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("test");
writer.setPassword("test");
// Create mapping to database columns
Map<String, String> mappings= new LinkedHashMap<String, String>();
mappings.put("r_user", "user");
mappings.put("r_movieid", "movieid");
mappings.put("r_rating", "rating");
mappings.put("r_timestamp", "timestamp");
writer.setRenameMapping(mappings);
// Have to specify the table name using the JDBC writer
writer.setTableName("ratings");
// Set the SQL statement use to initialize the target table (as needed).
writer.setInitializeTableSQL(
"create table if not exists ratings (r_user integer, r_movieid integer, r_rating integer, r_timestamp varchar(20))");
writer.setSqlWarningLimit(20);
Notes on the code example:
• The target table name is specified. When using
WriteToJDBC the table name property is required.
• The SQL statement used to initialize the target table is specified. The SQL syntax may vary by database vendor.
Using the WriteToJDBC Operator in RushScript to append data to a table
// The SQL statement used to initialize the table
var initSQL = 'create table if not exists ratings (r_user integer, r_movieid integer, r_rating integer, r_timestamp varchar(20))';
// The database field remapping to use
var mapping = {
"r_user":"user",
"r_movieid":"movieid",
"r_rating":"rating",
"r_timestamp":"timestamp"
};
// Create the write to JDBC operator and set properties
dr.writeToJDBC(data, {
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'test',
password:'test',
tableName:'ratings',
initializeTableSQL:initSQL,
renameMapping:mapping,
sqlWarningLimit:20},
maxParallelism:8); // optionally set the max number of parallel instances wanted
Properties
The
WriteToJDBC operator supports the following properties.
Ports
The
WriteToJDBC operator provides a single input port.
Using the DeleteFromJDBC Operator to Write Database Deletes
The
DeleteFromJDBC operator deletes rows from a target table using the records from an input port to determine which rows to delete. When deleting rows the key fields must be specified in the operator properties. The key field values will be used within the generated database delete statement to determine which rows qualify for deletion.
This operator provides an output port that will include the data used by the delete operation and an additional column containing the status codes returned by the database for each delete statement.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a jar file that contains the JDBC driver and its dependencies.
Note that the DeleteFromJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
• Within Java, create an
OperatorSettings instance with the maximum parallelism level set to the desired level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added.
• Within RushScript, set the maxParallelism level property within the property settings of the operator instance.
Code Examples
The following example is used to delete rows from a database table. The input data to the operator must contain the key values required to remove data from the target table. This example also demonstrates collecting the output so that the status codes can be recorded.
Using the DeleteFromJDBC operator in Java to delete rows from a table
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.jdbc.OutputMode;
import com.pervasive.datarush.operators.io.jdbc.DeleteFromJDBC;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
public class WriteMySQL {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("WriteDelete");
// Create a delimited text reader for the "ratings.txt" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(INT("r_user"), INT("r_movieid"), INT("r_rating"), STRING("r_timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Create JDBC writer
DeleteFromJDBC writer = graph.add(new DeleteFromJDBC());
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("root");
writer.setPassword("localadmin");
writer.setTableName("ratings");
// You have to specify the key field names.
writer.setKeyNames(new String[] {"r_user"});
writer.setSqlWarningLimit(20);
// Create a delimited text writer to collect the status codes
WriteDelimitedText logger = app.add(new WriteDelimitedText("ratings_delete_status.txt", WriteMode.OVERWRITE));
logger.setWriteSingleSink(true);
logger.setHeader(true);
// Connect operators
graph.connect(reader.getOutput(), writer.getInput());
graph.connect(writer.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Using the DeleteFromJDBC Operator in RushScript to delete data from a table
// Create the delete from JDBC operator and set properties
dr.deleteFromJDBC(data, {
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'root',
password:'localadmin',
tableName:'ratings',
keyNames:["r_user"],
sqlWarningLimit:20},
maxParallelism:8); // optionally set the max number of parallel instances wanted
Properties
The
DeleteFromJDBC operator supports the following properties.
Ports
The
DeleteFromJDBC operator provides a single input port.
The
DeleteFromJDBC operator provides a single output port.
Using the UpdateInJDBC Operator to Write Database Updates
The
UpdateInJDBC operator updates rows in a target table using records from an input port to determine which columns to update. When updating rows the key fields must be specified in the operator properties. The key field values will be used within the generated database update statement to determine which rows qualify for an update. Additionally the update fields may be specified in the operator properties. If the fields to update are not set all non-key fields will be used for the update.
This operator provides an output port that will include the data used by the update operation and an additional column containing the status codes returned be the database for each update statement.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a .jar file that contains the JDBC driver and its dependencies.
Note that the UpdateInJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
• Within Java, create an
OperatorSettings instance with the maximum parallelism level set to the desired level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added.
• Within RushScript, set the maxParallelism level property within the property settings of the operator instance.
Code Examples
The following example shows how to use the input data of the
UpdateInJDBC operator to update database rows. The "r_user" and "r_movieid" fields from the ratings data are used as keys. The rating value is updated. This example also demonstrates collecting the output so the status codes can be recorded.
Using the UpdateInJDBC operator to update database rows
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.jdbc.OutputMode;
import com.pervasive.datarush.operators.io.jdbc.UpdateInJDBC;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.record.SelectFields;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
public class UpdateMySQL {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("WriteMySQL");
// Create a delimited text reader for the "ratings.txt" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(INT("r_user"), INT("r_movieid"), INT("r_rating"), STRING("r_timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Create JDBC writer
UpdateInJDBC writer = graph.add(new UpdateInJDBC());
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("root");
writer.setPassword("localadmin");
writer.setTableName("ratings");
// Specify the key fields names used as keys in the SQL update commands.
writer.setKeyNames(new String[] {"r_user", "r_movieid"});
writer.setSqlWarningLimit(20);
// Specify the name of the field that should be updated
writer.setUpdateFields(new String[] {"r_rating"});
// Create a delimited text writer to collect the status codes
WriteDelimitedText logger = app.add(new WriteDelimitedText("ratings_update_status.txt", WriteMode.OVERWRITE));
logger.setWriteSingleSink(true);
logger.setHeader(true);
// Connect operators
graph.connect(reader.getOutput(), writer.getInput());
graph.connect(writer.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Using the UpdateInJDBC Operator in RushScript to update data in a table
// Create the update in JDBC operator and set properties
dr.updateInJDBC(data, {
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'root',
password:'localadmin',
tableName:'ratings',
keyNames:["r_user", "r_movieid"],
updateFields:["r_rating"],
sqlWarningLimit:20},
maxParallelism:8); // optionally set the max number of parallel instances wanted
Properties
The
UpdateInJDBC operator supports the following properties.
Ports
The
UpdateInJDBC operator provides a single input port.
The
UpdateInJDBC operator provides a single output port.
Using the WriteDelimitedText Operator to Write Delimited Text
The
WriteDelimitedText operator writes a stream of records as delimited text. The writer performs no reordering or filtering of the fields. If this is desired, the stream should be preprocessed with the appropriate operator.
Delimited text supports up to three distinct user-defined sequences within a record, used to identify field boundaries:
• A field separator: found between individual fields. By default this is a comma character.
• A field start delimiter: marking the beginning of a field value. By default this is the double quote character.
• A field end delimiter: marking the end of a field value. By default this is the double quote character.
The field separator cannot be empty. The start and end delimiters can be the same value. They can also both (but not individually) be empty, signifying the absence of field delimiters. The writer will always delimit fields, regardless of whether the values require it. Should a delimited field need to contain the end delimiter, it is escaped from its normal interpretation by duplicating it. For instance, the value ab""c will be formatted as "ab"c".
The writer accepts a
RecordTextSchema to provide formatting information for fields, as well as header row information. It is not required to provide a schema; however, one can be generated from the input data type using default formatting based on the data type. The writer also supports a pluggable discovery mechanism for creating a schema based on the input type, should fine-grained dynamic control be required.
Any schema, supplied or discovered, must be compatible with the input to the reader. To be compatible, a schema must contain a field definition with an assignable type for each field named in the input. Fields present in the schema but not the input are permitted with the missing field assuming a null value.
Delimited text data may or may not have a header row. The header row is delimited as usual but contains the names of the fields in the data portion of the record. The writer will emit a header row if it should be present and the write is not appending to an existing file.
When writing delimited text data, the metadata containing information about the ordering and partitioning of the data may optionally be saved with the data. This can provide a performance improvement when reading the data later since the data may not need to be re-sorted or partitioned again to ensure it meets various requirements of the graph.
As with all file-based write operators, the
WriteDelimitedText operator will treat its sink as a directory by default. A file-per-application data stream will be output in this case. This can be overridden using the writeSingleSink property.
Code Examples
The following code example demonstrates using the
WriteDelimitedText operator to write a set of data files.
Using the WriteDelimitedText operator in Java
// Construct the writer with the path where the results are to be written.
// Use OVERWRITE mode indicating the output file should be overwritten if it exists already.
WriteDelimitedText writer = graph.add(
new WriteDelimitedText("results/test-ratings", WriteMode.OVERWRITE));
// No start/end field delimiter wanted
writer.setFieldDelimiter("");
// Output a header row at the beginning of the file with field names
writer.setHeader(true);
// Connect the output of the reader to the input of the writer
graph.connect(operator.getOutput(), writer.getInput());
Using the WriteDelimitedText operator in RushScript
// Note that this operator doesn't return anything
dr.writeDelimitedText(data, {target:'results/test-ratings', mode:'OVERWRITE', fieldDelimiter:'', header:true});
Note that the writeSingleSink property was not set. The property defaults to false. Running this application will result in multiple files being written into the target directory. The files will be named "part%n" where %n is replaced with the partition number. The number of files written will correspond to the parallelism configured for the DataFlow engine.
The partial screen shot below shows the results directory that contains the output of the sample application. Eight part files were written since the engine parallelism level was set to eight. Writing files in this manner is optimal, as the individual data streams can be written in parallel.
If for some reason the data is needed in a single file, enable the writeSingleSink property. The following code fragment amends the example above to add the setting of this property. The output will be a single file containing all of the resultant data. Performance will be adversely affected as the data must be gathered into a single stream for writing.
Using a single sink
// Create a delimited text writer
WriteDelimitedText writer = graph.add(new WriteDelimitedText("results/test-ratings.txt", WriteMode.OVERWRITE));
writer.setFieldDelimiter("");
writer.setHeader(true);
writer.setWriteSingleSink(true);
Properties
The
WriteDelimitedText operator supports the following properties.
Ports
The
WriteDelimitedText operator provides a single input port.
Using the WriteFixedText Operator to Write Fixed-width Text
The
WriteFixedText operator writes a record dataflow as a text file of fixed-width records. The writer requires a schema (
FixedWidthTextRecord) to determine how field values are positioned and formatted. Fields are mapped by name; all fields defined in the schema must be present in the input (and vice versa), although they need not be in the same order. The order of fields in the formatted record is determined by the order in the supplied schema. Records in the file are separated by a non-empty, user-defined character sequence.
The record schema can be manually constructed using the API provided, though this metadata is often persisted externally. Support for Pervasive Data Integrator structured schema descriptors (.schema files) is provided by the
StructuredSchemaReader class.
Code Examples
The following example uses the
WriteFixedText operator to write a file in fixed-width format.
Using the WriteFixedText operator in Java
// Define the schema for the account data. This schema can be used
// for reading and writing the account data.
FixedWidthTextRecord schema = new FixedWidthTextRecord(new TextConversionDefaults(StringConversion.NULLABLE_TRIMMED));
schema.defineField("accountNumber", new PaddedTextType(TextTypes.STRING, 9, ' ', Alignment.LEFT));
schema.defineField("name", new PaddedTextType(TextTypes.STRING, 21, ' ', Alignment.LEFT));
schema.defineField("companyName", new PaddedTextType(TextTypes.STRING, 31, ' ', Alignment.LEFT));
schema.defineField("address", new PaddedTextType(TextTypes.STRING, 35, ' ', Alignment.LEFT));
schema.defineField("city", new PaddedTextType(TextTypes.STRING, 16, ' ', Alignment.LEFT));
schema.defineField("state", new PaddedTextType(TextTypes.STRING, 2, ' ', Alignment.LEFT));
schema.defineField("zip", new PaddedTextType(TextTypes.STRING, 10, ' ', Alignment.LEFT));
schema.defineField("emailAddress", new PaddedTextType(TextTypes.STRING, 25, ' ', Alignment.LEFT));
schema.defineField("birthDate", new PaddedTextType(TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy")), 10, ' ', Alignment.LEFT));
schema.defineField("accountCodes", new PaddedTextType(TextTypes.STRING, 11, ' ', Alignment.LEFT));
schema.defineField("standardPayment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
schema.defineField("payment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 7, ' ', Alignment.LEFT));
schema.defineField("balance", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
// Create fixed text writer
WriteFixedText writer = graph.add(new WriteFixedText("results/accounts-fixedwidth.txt", WriteMode.OVERWRITE));
writer.setSchema(schema);
writer.setWriteSingleSink(true);
Here is a fragment of the data output by the example
WriteFixedText operator.
01-000667George P Schell Market Place Products 334 Hilltop Dr Mentor OH44060-1930warmst864@aol.com 02/28/1971XA 101.0 100.0 15.89
01-002423Marc S Brittan Madson & Huth Communication Co 5653 S Blackstone Avenue, #3E Chicago IL60637-4596mapper@tcent.net 06/30/1975BA 144.0 144.0 449.92
01-006063Stephanie A Jernigan La Salle Clinic 77565 Lorain Akron OH44325-4002dram@akron.net 11/02/1941EB|CB 126.0 126.0 262.98
01-010474Ernie Esser Town & Country Electric Inc. 56 Pricewater Waltham MA2453 hazel@bentley.net 12/15/1962JA|RB 127.0 127.0 271.75
01-010852Robert A Jacoby Saturn of Baton Rouge 4001 Lafayette Baton Rouge LA70803-4918din33@norl.com 12/22/1985ED|EA|RB|KA142.0 150.0 423.01
01-011625James C Felli Bemiss Corp. 23A Carolina Park Circle Spartanburg SC29303-9398cadair@gw.com 02/21/1940SB 151.0 155.0 515.41
01-018448Alan W Neebe Georgia State Credit Union PO Box 159 Demorest GA30535-1177delores@truett.com 01/31/1960MA|ED|SB 113.0 120.0 131.89
01-018595Alexander Gose Office Support Services 436 Green Mountain Circle New Paltz NY12561-0023dams@matrix.net 06/19/1940EC 147.0 147.0 477.09
The following example demonstrates writing fixed-width data using RushScript.
Using the WriteFixedText operator in RushScript
var accountsFixedSchema = dr.schema({type:'FIXED'})
.nullable(true)
.trimmed(true)
.padChar(' ')
.alignment('LEFT')
.STRING("accountNumber", {size:9})
.STRING("clientName", {size:21})
.STRING("companyName", {size:31})
.STRING("streetAddress", {size:35})
.STRING("city", {size:16})
.STRING("state", {size:2})
.STRING("zip", {size:10})
.STRING("emailAddress", {size:25})
.DATE("birthDate", {pattern:'MM/dd/yyyy', size:10})
.STRING("accountCodes", {size:11})
.DOUBLE("standardPayment", {pattern:'0.00', size:6})
.DOUBLE("payment", {pattern:'0.00', size:7})
.DOUBLE("balance", {pattern:'0.00', size:6});
// Read fixed-width data data
dr.writeFixedText(
data,
{
target:'/path/to/file.txt',
mode:'OVERWRITE',
schema:accountsFixedSchema
});
Properties
The
WriteFixedText operator supports the following properties.
Ports
The
WriteFixedText operator provides a single input port.
Using the WriteSink Operator to Write Generic Targets
The
WriteSink operator writes a stream of records to a data sink. The sink accepts a sequence of bytes that represent formatted records, identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor, which must be provided.
This operator is low level, providing a generalized model for reading files in a distributed fashion. Typically,
WriteSink is not directly used in a graph. Instead, it is indirectly used through a composite operator such as one derived from
AbstractWriter, providing a more appropriate interface to the end user.
Parallelized writes are supported by creating multiple output sinks from the original target, called fragments, each representing the portion of data on one partition. In the case of a write to a file system, this would result in a directory of files. To write in parallel the target must support the concept of fragmenting. If a target does not support fragmenting the write is forced to be non-parallel.
A port is provided that signals completion of the writer. This can be used to express a dependency between operators based on the target having been successfully written.
The writer makes a best-effort attempt to validate the target before execution, but cannot always guarantee correctness depending on the nature of the target. This is done to try to prevent failures that may result in the loss of a significant amount of work, such as when misconfigured graphs execute in a late phase.
Tip: This is a low-level operator that typically is not directly used. It can be used with a custom data format. A custom data format may be needed to support a format not provided by the DataFlow library.
Code Example
This example code fragment demonstrates how to set up a writer for a generic file type.
Using the WriteSink operator
WriteSink writer = new WriteSink();
writer.setTarget(new BasicByteSink("filesink"));
writer.setMode(WriteMode.OVERWRITE);
writer.setFormat(new DelimitedTextFormat(TextRecord.convert(record(INT("intfield"),STRING("stringfield"))),
new FieldDelimiterSettings(),
new CharsetEncoding()));
Properties
The
WriteSink operator supports the following properties.
Ports
The
WriteSink operator provides a single input port.
Using the WriteStagingDataset Operator to Write Staging Data Sets
The
WriteStagingDataset operator writes a sequence of records to disk in an internal format for staged data. Staged data sets are useful as they are more efficient than text files, being stored in a compact binary format. If a set of data must be read multiple times, significant savings can be achieved by converting it into a staging data set first.
It is generally best to perform parallel writes, creating multiple files. This allows reads to be parallelized effectively, as the staging format is not splittable.
There are several use cases for directly using staging data sets. A few examples are:
• A particular task to be accomplished must be broken into several DataFlow applications. Data from each application must be passed to the next. Staging data sets can be used to capture the intermediate data, allowing for quick and easy access.
• Data within a database is being used to build a predictive model. This development will require many iterations of algorithm testing with the data.
For this use case, the most efficient route is to first read the data from the source database and write it to staging data sets. Then use the staged data as input into the predictive analytic development. The one-time read from the database is overhead. However, given that the source data will be read many times, the savings will be substantial.
Code Examples
The following code example reads from a delimited text file and writes the data to a staging data set.
Using the WriteStagingDataset operator in Java
// Write to a staging dataset overwriting existing content
WriteStagingDataset datasetWriter = graph.add(
new WriteStagingDataset("results/ratings-stage", WriteMode.OVERWRITE));
Using the WriteStagingDataset operator in RushScript
// Note: no return value
dr.writeStagingDataset(data, {target:'results/ratings-stage', mode:'OVERWRITE'});
The following partial screen shot shows the results directory that contains the output of the sample application. Eight part files were written since the engine parallelism level was set to eight. Writing files in this manner is optimal, as the individual data streams can be written in parallel. This is recommended for staging data sets because the format is not splittable. Having a single file output limits the parallelism when reading.
Properties
The
WriteStagingDataset operator supports the following properties.
Ports
The
WriteStagingDataset operator provides a single input port.
Using the WriteARFF Operator to Write Sparse Data
The
WriteARFF operator writes files using the Attribute-Relation File Format (ARFF). ARFF supports both sparse and dense formats. The format to use is specified with the format mode property. The default mode is sparse. When the input data is highly sparse, the sparse mode can save space since it only writes non-zero values. However, if the data is not very sparse, using sparse mode can actually cause the resultant file to be larger. Use dense mode for dense data.
Note that when writing sparse data that contains enumerated types, the zero ordinal category will always be treated as sparse and so will not appear in the output file. This is normal and expected behavior. The enumerated type is captured in the metadata. When the file is read, the zero ordinal values will be restored as expected.
ARFF provides a metadata section at the start of the file. The metadata may contain several items:
• Comments: free-form, user-supplied comments.
• Relation name: a user name given to the relation contained within the ARFF file.
• Attribute definitions: defines the field name and type of each data field. The attribute definitions define the schema of the file. The definitions are generated by the operator.
ARFF is useful when the input data is sparse. Since ARFF includes metadata such as the file schema, it is also very useful and convenient to use. The dense format is a simple CSV format and as such is very similar to the format written by the WriteDelimitedText operator.
Code Examples
The following example writes an ARFF file in sparse mode.
Using the WriteARFF operator in Java
// Write to an ARFF file in sparse mode
WriteARFF arffWriter = graph.add(new WriteARFF("results/ratings-arff.txt", WriteMode.OVERWRITE));
arffWriter.setFormatMode(ARFFMode.SPARSE);
arffWriter.setRelationName("Movie Ratings");
arffWriter.addComment("This is an ARFF file written using DataFlow");
arffWriter.setWriteSingleSink(true);
Notes from the example:
• The ARFF mode is set to sparse. This enables writing the data in sparse format. Any numeric data values equal to zero will not be written to the output, thereby saving space. When the data is sparse, using the sparse format is very efficient.
• The relation name is set. This is optional. The relation name is part of the ARFF metadata. It is used by consumers of ARFF files to distinguish the data set.
• A comment is added. This is also optional. Comments are added at the beginning of the ARFF metadata section. When reading an ARFF file the comments can also be retrieved. There is no practical limit to the number of comment lines that may be added.
• The writeSingleSink property is enabled. This option forces the writing of a single output file. The default is to write multiple files in parallel.
Using the WriteARFF operator in RushScript
dr.writeARFF(data, {
target:'results/ratings-arff.txt',
mode:'OVERWRITE',
formatMode:'SPARSE',
relationName:'Movie Ratings',
comments:'This is an ARFF file written using DataFlow',
writeSingleSink:'true'});
The following is the first few lines of the ARFF file written with the code example. The metadata appears at the top of the file. Metadata statements begin with the "@" character. Comments begin with the "%" character. The data follows after the DATA statement. Data rows in sparse format are wrapped with curly braces. Data values are comma-separated. Each data value begins with the field index followed by the data value.
% This is an ARFF file written using DataFlow
@relation 'Movie Ratings'
@attribute userID integer % int32
@attribute movieID integer % int32
@attribute rating integer % int32
@attribute timestamp string
@data
{0 1,1 1193,2 5,3 978300760}
{0 1,1 661,2 3,3 978302109}
{0 1,1 914,2 3,3 978301968}
{0 1,1 3408,2 4,3 978300275}
{0 1,1 2355,2 5,3 978824291}
{0 1,1 1197,2 3,3 978302268}
{0 1,1 1287,2 5,3 978302039}
{0 1,1 2804,2 5,3 978300719}
{0 1,1 594,2 4,3 978302268}
Properties
The
WriteARFF operator supports the following properties.
Ports
The
WriteARFF operator provides a single input port.
Using the ForceRecordStaging Operator to Explicitly Stage Data
Using the
ForceRecordStaging operator forces the operators on the input and output sides of the operator to execute sequentially, instead of concurrently. In a dataflow graph, consumers and producers run simultaneously, exchanging data as the operators are ready. This permits the graph to exploit pipeline parallelism. The data is staged to disk in configured temporary storage spaces.
In some cases, it may be necessary to avoid this concurrency. The simplest example is:
• Operator A produces data.
• Operator B consumes data from A, producing data only when all data from A has been consumed. (Calculating a total, for instance.)
• Operator C consumes the output of both A and B, but does not consume any data from A until after consuming data from B.
In this case, a memory usage dilemma, known as queue expansion, arises. B needs all of A's data, but C does not read anything from A until B produces some data. As the data unread by C cannot be discarded, it must be saved until it is read. To minimize memory requirements, the lag between readers is limited. This limit is raised when required, but in this case, it requires the entire output of A, which could easily exceed memory.
Usually, this is automatically handled by the framework, saving data to disk. This is done in a optimistic fashion, avoiding disk I/O and associated performance degradation until necessary at the cost of a (small) potential risk of graph failure due to memory being exhausted. This operator provides a pessimistic approach to the problem that forces all data to disk, with all the associated performance costs.
Tip: Use the ForceRecordStaging operator directly in situations where it is known that queue expansion will occur. Direct usage prevents the overhead of handling queue expansion.
Code Example
The code example provides a common use case requiring use of the
ForceRecordStaging operator. A data source is aggregated using the
Group operator. Aggregation causes a reduction of the input data. The output of
Group is then wired into a
Join operator. This allows joining the aggregated results back to the original data. However, only a few key groups are known to exist. This implies the join will consume large amounts of the original source while waiting for data from the group. Forcing the data to be staged prevents queue expansion and the overhead of handling the expansion at run time.
Note that the example is not complete: code to produce the source is not provided, and properties are not set on the group and join operators.
Using the ForceRecordStaging operator
// Aggregate the source data
Group group = graph.add(new Group());
graph.connect(source, group.getInput());
// Force staging of source data to prevent queue expansion.
// The group operator reduces the input row count considerably.
ForceRecordStaging forceStaging = graph.add(new ForceRecordStaging());
graph.connect(source, forceStaging.getInput());
// Join the original data to the aggregated results. Use the staged data
// instead of the original source to prevent queue expansion.
Join join = graph.add(new Join());
graph.connect(forceStaging.getOutput(), group.getOutput());
Using the ForceRecordStaging operator in RushScript
// Force a dataset to be staged
var stagedData = dr.forceRecordStaging(data);
Properties
The
ForceRecordStaging operator has no properties.
Ports
The
ForceRecordStaging operator provides a single input port.
The
ForceRecordStaging operator provides a single output port.
Using the WriteORC Operator to Write Apache ORC Files
The
WriteORC operator writes data file in
Apache Optimized Row Columnar (ORC) format. The ORC format is supported by Apache Hive.
The operator is registered as
writeORC for using as JSON or scripting. This operator is based on
AbstractWriter operator and supports the features from the base class, particularly in parallel writing. Each par file created in this way is a logical ORC file. Similar to
ReadORC operator, the WriteORC operator can access Hive libraries using the shim layer. The operator uses the
ORCFile or Writer API.
Note: The operator requires a Hadoop module configuration to be enabled even if the workflow does not run on the cluster or access HDFS.
For more information about ORC format, go to the
ORC file format manual.
A few DataFlow data types such as CHAR and TIME are not supported for writing because these do not have a matching ORC type. If a schema containing the unsupported data types is found, then the WriteORC operator will throw an exception at the time of compiling a graph.
In general, DataFlow types are assigned to the ORC schema types as mentioned in the following table.
Properties
The
WriteORC operator provides the following properties that is derived directly from
WriterOptions of the ORC.
Note: Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see
Integrating DataFlow with Hadoop.
Code Examples
The following example provides writing the ORC file data using Java.
Using the WriteORC operator in Java
WriteORC writer = graph.add(new WriteORC("hdfs://localhost:9000/dforc.orc"));
writer.setMode(WriteMode.OVERWRITE);
writer.setCompression(ORCCompression.SNAPPY);
writer.setBlockPadding(true);
The following example provides writing the ORC file data using RushScript.
Using the WriteORC operator in RushScript
var writer = dr.writeORC("orc-out", source, {target:"file:///data/dforc.orc", writeSingleSink:true});
Port
The
WriteORC operator provides a single input port.
Database Loaders
Database loaders provide the ability to transfer data into a database quickly and efficiently. They bulk-load data using APIs or utilities specific to the database implementation. Unlike JDBC, they are not generic. But they are able to provide a performance boost over JDBC access.
DataFlow provides the following database loaders:
Using the LoadActianVector Operator to Load Actian Vector
The LoadActianVector operator is used to bulk-load data into an Actian Vector database. The operator provides a single input port containing the data to load into Vector. There are several methods for loading the data that are supported:
• Direct: Data is streamed directly into the Vector engine. This method is generally faster than the other methods. It is also more efficient because data is streamed directly into the Vector engine without having to stage it first. This method also supports execution within a cluster environment. As such, data can be efficiently moved from HDFS into Vector. Currently, direct loading is only supported on Linux and Windows 64-bit platforms.
• vwload: The vwload utility provides the generally fastest method to bulk load data into Vector. The utility can only be run on the Vector server machine. The user running vwload must have DBA privileges.
• SQL COPY: This command can be executed on machines remote to the Vector server. The Vector client software must be installed and configured to use SQL COPY. The user does not have to have DBA privileges to run SQL COPY. SQL COPY does require copy permission on the target table.
• COPY VWLOAD: This command can be executed on machines remote to the Vector server. The command allows the vwload utility to be executed using a SQL command. This command is useful if the vwload utility is not available on the path of the target instance. COPY VWLOAD does require copy permission on the target table.
Direct Load
When using the direct load capability, the data is streamed from the input port directly into the Vector engine. Direct loading can run in parallel and supports execution within a cluster environment such as Hadoop. Direct loading can be used to copy data from HDFS into a Vector instance. When run within a Hadoop cluster, the reading of the data, formatting and sending to Vector operations are run distributed, taking full advantage of the Hadoop resources.
When loading to Vector database up to version 4.2, direct loader sorts the data on clustered index before loading. Direct load method leverages parallel execution capabilities in DataFlow to sort the data before loading to Vector database up to version 4.2. When unique keys are defined and they are the same as sort keys, direct loader eliminates the duplicates before loading. However, this duplicate key elimination is limited to data being loaded, not to the data existing in the table already. As a result, incremental loads are not supported when sort keys or unique keys are defined on a table.
This loader has certain limitations:
• When both Sort Keys and Unique Keys are defined, both should be same, else the loading will fail.
• When Sort Key or Unique Key are specified, incremental loads are not supported. This means the target table should be empty, else the loading will fail.
• A DataFlow workflow can load to only one version of Vector at a time. When executing workflows that load to different versions of Vector, the KNIME UI should be restarted.
• To load data using the direct method, the user should have MONITOR privilege for Vector version 3.0 and DB Admin privilege for Vector version 3.5.
• There may be loss of precision when converting double to decimal value. Converting from String to Decimal preserves the precision.
• When loading Time values with time zones the timestamp type should be used to guarantee the data is loaded correctly. When the time zone is unspecified such as when using Time types, the local time zone will be used.
vwload, COPY VWLOAD, and SQL COPY
When using either the vwload, COPY VWLOAD, or SQL COPY methods for loading, the LoadActianVector operator reads its input data and stages the data into temporary files in the correct format for loading. This data staging is done in parallel according to the current parallelism set for the application, or specifically for the operator. After the data is staged into the load format, the load procedure is invoked (vwload, COPY VWLOAD, or SQL COPY). The staged files are deleted after the load completes, whether successfully or not.
The source fields can be mapped to the target table columns using a provided field map (see the renameMapping property). If a mapping is not provided, the source data fields will be mapped by position to the target table columns. The mapping happens in schema definition order of the target table. The source field names do not have to match the target columns names.
For example, given a table with three columns:
• orderkey (varchar)
• quantity (integer)
• price (double)
Loading data into this table requires three input fields that match the given types (string, integer, and double). If a field mapping is not provided, the input data will be mapped by position to the target columns. So the first input field will be mapped to column orderkey, the second input field to column quantity, and the third input field to column price. If the input data is not ordered correctly, then the load will fail. Providing a field map specifies which input fields map to which database columns. See the code examples for the details of providing a field map.
Enable rollback to abort a data load after the specified number of errors occur. The errors in question are ones caused by failing to parse records from the staged load files. When a data load is aborted and rollback is enabled, no data will be written to the target table. The maximum errors tolerated defaults to zero. Other critical system errors may abort a load directly without consideration of the max errors property.
Enable the jdbcOnly option to limit the data load to using JDBC exclusively instead of opening additional socket connections to the instance. Currently, this option only applies to COPY VWLOAD. This option can be used if additional ports other than the one used for the JDBC connection cannot be created due to security or firewall restrictions.
To capture the input records that are rejected by vwload, COPY VWLOAD, or SQL COPY load process, set the rejectsPath property. The rejected records will be written to a file at this path. If no rejects are generated, the rejects file will not be created.
The output of vwload, COPY VWLOAD, or the SQL command will be captured and written to the application log at the INFO log level. View this information for details about the load process.
Null Values
When inserting null values with either direct load or vwload, the null settings of the table will be enforced by the loader. This means that an error will be thrown when trying to load a column that is not nullable with a null.
Time and Timestamp Precision
When loading Time and Timestamp columns using vwload, COPY VWLOAD, or SQL COPY the maximum supported precision is milliseconds. If greater precision is required, direct loading should be used, which supports nanosecond precision.
Connecting to Avalanche/Avalanche Configuration Tips
Actian Avalanche is a cloud-based version of Actian Vector. Currently, Avalanche only supports loading with the copy vwload method. To load data into an Avalanche instance, the following options must be configured:
• Enable the jdbcOnly option (set it to true).
• Enable encryption with the extraProperties option that is, as a pair ("encryption", "on").
• Typically, Avalanche instances must use an AWS s3a:// temporary staging location that is in the same AWS region as the instance.
• Most Avalanche instances use a custom port setup. Therefore, the port must be manually set and the instance is usually empty although your setup may vary.
Code Examples
The following code example demonstrates using the LoadActianVector operator using the Java API. The direct method is used. The user and password provided are required to be the DBA account.
Using the LoadActianVector operator in Java (directload)
// Build a map of source field names to target table column names.
// Only these fields will loaded. Other source fields are dropped.
// Target columns not mapped will be null filled.
Map<String, String> fieldMap = new HashMap<String, String>();
fieldMap.put("orderkey", "l_orderkey");
fieldMap.put("partkey", "l_partkey");
fieldMap.put("suppkey", "l_suppkey");
fieldMap.put("linenumber", "l_linenumber");
fieldMap.put("quantity", "l_quantity");
fieldMap.put("extendedprice", "l_extendedprice");
// Create the operator within an application and set the properties.
// Use direct for this load. No need to set maxErrors, rollback is disabled.
LoadActianVector loader = app.add(new LoadActianVector());
loader.setHost("database-server");
loader.setInstance("VW");
loader.setDatabaseName("tpch");
loader.setTableName("lineitem");
loader.setMethod(LoadMethod.DIRECT);
loader.setUser("user");
loader.setPassword("password");
loader.setRenameMapping(fieldMap);
loader.setRollback(false);
The following example demonstrates using the operator in RushScript. Note that because the vwload utility is used, the application must be run on the Vector server.
Using the LoadActianVector Operator in RushScript (vwload)
var fieldMap = {
'orderkey':'l_orderkey',
'partkey':'l_partkey',
'suppkey':'l_suppkey',
'linenumber':'l_linenumber',
'quantity':'l_quantity',
'extendedprice':'l_extendedprice'
};
// Load data into Vector using vwload.
dr.loadActianVector(
data,
{
host:'<IP address or FQDN of the cluster>',
instance:'VW',
databaseName:'tpch',
tableName:'lineitem',
rejectsPath:'rejects.txt',
renameMapping:fieldMap,
rollback:true,
maxErrors:10,
method:'VWLOAD',
user:'user',
password:'password'
- });
Properties
The LoadActianVector operator supports the following properties.
Name | Type | Description |
charset | String | Determines the character set that is used during staging with VWLOAD method. For the vwload supported character sets, see the Actian Vector documentation. |
cleanData | boolean | If this is enabled, additional operations and checks may be performed on the data before loading it into Vector to ensure it meets any table constraints or other requirements. Additionally, invalid values will be loaded as nulls if the table allows instead of producing errors, such as when stringTruncationError or decimalTruncationError are enabled. Default: false. |
database | String | The name of the target database. |
decimalTruncationError | boolean | Whether an error will be thrown if decimal truncation would occur during database loading. Default: false. |
extraProperties | Map(String, String) | A map of key value properties that will be used when creating the connection to the Vector database. |
finalizeTableSQL | String | The SQL statement to execute after processing all the records. For example, this could be used to execute a SQL CREATE INDEX statement. |
host | String | Name of the Vector host. |
initializeTableSQL | String | The SQL statement to execute before processing any records. |
insertMode | String | The insert mode (ROW, BULK, or DEFAULT) that applies for Insert and Merge operations when Direct loading. Default: DEFAULT. |
instance | String | Name of the Vector instance. Default: "VW" |
jdbcOnly | boolean | When jdbcOnly is enabled, the load attempts to use the JDBC connection exclusively instead of remotely executing the command. Default: false. |
maxErrors | int | The maximum errors tolerated before aborting and rolling back a data load. Only applicable when rollback is enabled. Default: 0 |
method | LoadMethod | The load method to use for loading the data. There are currently three choices with the following constraints: • DIRECT: Streams the data directly into the Vector engine. Only a privileged user can use the direct method. The user must also have access to write the target table. • VWLOAD: This utility is used to load the data. The vwload utility must be executed on the Vector server. It is generally a faster method to load data. Only the DBA user can execute vwload. • SQLCOPY: This command can be used remotely and is used to load the data. The Vector client software must be installed and configured for SQL COPY to work. The user does not require DBA permissions but must have copy access to write to the target table. • COPYVWLOAD: This command is used to load the data. It can also be used remotely to initiate execution of vwload through SQL. |
password | String | The password for the user account that has write/copy permissions for the target table. This property is always needed for the SQL copy method. It is optionally required for use with vwload. When provided with vwload, it allows the DBA to impersonate the database user for the purpose of loading data into the target table. |
port | int | The port used by the Vector instance. Default: 7. |
rejectsPath | String | Path to the target file containing rejected records from the data load. If no records are rejected, the file will not be created. |
renameMapping | Map<String, String> | A map of source field names to target table column names. If the input data schema matches the target table schema in number of fields, order, and type of the fields, then this property is not needed. If a map is provided source fields that are not mapped are dropped. Likewise, target columns that are not mapped will be null-filled. Non-nullable fields that are not mapped will result in a SQL error. |
rollback | boolean | When rollback is enabled, the data load will be aborted and all data rolled back if the max error limit is crossed. If disabled, the data load will continue if errors occur. The rejected records will be written to the rejectsPath file. |
sshPassword | String | Operating system password used to connect to NameNode of the Hadoop installation. This operator uses this password to establish an SSH connection to the machine running the master node of the Vector in Hadoop installation. |
sshUser | String | Operating system user ID used to connect to NameNode of the Hadoop installation. This operator uses this user ID to establish an SSH connection to the machine running the master node of the Vector in Hadoop installation. |
stringTruncationError | boolean | Whether an error will be thrown if string truncation would occur during database loading. Default: false. |
table | String | The name of the table that is the target of the data load. |
tmpDirectory | String | A temporary directory to be used for storing the intermediate loader files. If not set, it will attempt to use the local default temporary directory. A directory must be specified that exists in the HDFS file system on the target Hadoop cluster when loading Vector on Hadoop with vwload or the load will not use the distributed loader. |
user | String | The database user account that has copy permissions to the target database table. This property is always needed for the SQL copy method. It is optionally required for use with vwload. When provided with vwload, it allows the DBA to impersonate the database user for the purpose of loading data into the target table. |
vectorSize | int | The size in units of rows to use for buffering input data before constructing Vector blocks. The available native memory impacts the value of this parameter. For tables with wide columns (more than 1000), this parameter should be adjusted down accordingly to avoid OutofMemoryExceptions. Default: 1024. This property is only valid when the load method is set to DIRECT. |
Ports
The LoadActianVector operator provides a single input port.
Using the LoadMatrix Operator to Load Actian Matrix
The
LoadMatrix operator is used to bulk load data into a Matrix database. The operator provides a single input port containing the data to load into the database instance. Data is loaded directly into the database instance using the ODI capability of the Matrix database. The loading executes in parallel and supports distributed loading when executed on a cluster.
The source fields can be mapped to the target table columns using a provided field map (see the renameMapping property). If a mapping is not provided, the source data fields will be mapped by position to the target table columns. The mapping happens in schema definition order of the target table. The source field names do not have to match the target columns names.
For example, given a table with three columns:
• l_orderkey (varchar)
• l_quantity (integer)
• l_extendedprice (double)
To load data into this table requires three input fields that match the given types (string, integer, and double). If a field mapping is not provided, the input data will be mapped by position to the target columns. So the first input field will be mapped to column l_orderkey, the second input field to column l_quantity, and the third input field to column l_extendedprice. If the input data is not ordered correctly, then the load will fail. Providing a field map specifies which input fields map to which database columns. See the code examples for the details of providing a field map.
The Actian Matrix database supports the concept of slices. A slice represents a unit of parallelism within the database. Each node within a database instance normally supports multiple slices. The loader operator communicates directly with slices to transfer data into the target database. The number of connections made into the database is determined by the parallelism level of the loader operator. The input data is not repartitioned to match the number of target slices. If the parallelism (partition count) of the loader operator is less than the number of database slices, some slices will be left idle during the load process. To repartition the data to match the number of slices in the target database, explicitly set the parallelism level of the loader operator to match the number of database slices.
A component of the data loader executes within the database. This in-database component supports tracing for debugging. The tracing level can be set to values 0 to 9. By default, the trace level is set to 0. To increase the amount of tracing produced in the database by the loader, set the trace level to a positive, non-zero value. The traceLevel property is used to set the Matrix UDF (user defined function) trace level.
Network communication between this DataFlow operator and the Matrix database is accomplished using TCP/IP sockets. The properties startPortRange and endPortRange can be used to limit the range of TCP/IP ports used for this communication. The ports within this range will need to be available for two-way communication. Within an environment containing a firewall, the ports will need to be “opened” within the firewall configuration. The default port range is from 40000 to 41000 (exclusive). Each machine in both the DataFlow cluster and the Matrix cluster must also have a fully qualified host name, and each of these names must be resolvable on all other machines in both clusters, using either DNS or /etc/hosts (or equivalent) files.
When you run multiple workflows concurrently, or run workflows with multiple Load Matrix operators, ensure the port range is large enough to allow at least one open port per slice per operator. Also, ensure Matrix database workload management is configured to allow enough concurrent queries to leave at least one available for each active Load Matrix operator. This availability is configured using the
max_concurrent_queries property in ~paraccel/padb/rel/etc/padb_wlm.xml on the Matrix server. You may configure a DataFlow service class with specific properties for DataFlow jobs, which will allow the loader to check for available resources before launching a job, and fail quickly if there are insufficient resources on the Matrix server; without this service class, a job may hang when there are insufficient resources. For more information about configuring this service class, see
Installing and Configuring DataFlow for Matrix On-Demand Integration (ODI).
The DataFlow Matrix ODI should be installed on the Matrix database prior to using this operator. The ODI currently supports Actian Matrix versions 4.0, 5.0, and 5.1. For instructions on installing the ODI, see
Installing and Configuring DataFlow for Matrix On-Demand Integration (ODI).
A network connection for data transfer is created for each parallel data worker. To limit the number of network connections used, limit the parallelism of the
LoadMatrix operator. A limit on connections can be explicitly set on the operator using the
maxConnections operator property. See the properties section below for more information.
Note that modifying the parallelism level of the
LoadMatrix operator either through the operator setting or the
maxConnections property may incur the overhead of data redistribution to match the changed parallelism level. To prevent redistribution, set the overall parallelism of the DataFlow job to limit the number of data connections used into Matrix.
Note: Setting the maxConnections property may incur the overhead of data redistribution. Data redistribution occurs to gather data from a higher number of data streams to a lower number of data streams within DataFlow.
The operator supports running a SQL command before the data loading starts (initializeTableSQL property) and after the load ends (finalizeTableSQL property). These SQL commands are submitted to the Matrix system using a JDBC connection. If the initial SQL commands fail, the entire load operation will be abandoned and the Dataflow execution will terminate. The final SQL commands are executed after the load successfully takes place. If the final SQL commands fail, an error is logged and execution is halted. However, the data that has been successfully loaded is not rolled back.
Note: The Time and Time with Time Zone data types were added in Matrix 5.0, but are only supported by the ODI for Matrix versions up to 5.3.1. When loading data into a Matrix 5.0 database, any fields of these types will be populated with null values, and a warning will be generated.
Code Examples
The following code example demonstrates using the
LoadMatrix operator using the Java API.
Using the LoadMatrix operator in Java
// Build a map of source field names to target table column names.
// Only these fields will loaded. Other source fields are dropped.
// Target columns not mapped will be null filled.
Map<String, String> fieldMap = new HashMap<String, String>();
fieldMap.put("orderkey", "l_orderkey");
fieldMap.put("partkey", "l_partkey");
fieldMap.put("suppkey", "l_suppkey");
fieldMap.put("linenumber", "l_linenumber");
fieldMap.put("quantity", "l_quantity");
fieldMap.put("extendedprice", "l_extendedprice");
// Create the operator within an application and set the properties.
LoadMatrix loader = app.add(new LoadMatrix ());
loader.setHost("database-server");
loader.setDatabase("test");
loader.setTable("lineitem");
loader.setUser("user");
loader.setPassword("password");
loader.setRenameMapping(fieldMap);
loader.setLogFrequency(100000); // logs progress every 100K records
Using the LoadMatrix Operator in RushScript
var fieldMap = {
'orderkey':'l_orderkey',
'partkey':'l_partkey',
'suppkey':'l_suppkey',
'linenumber':'l_linenumber',
'quantity':'l_quantity',
'extendedprice':'l_extendedprice'
};
// Load data into Matrix.
dr.loadMatrix(
data,
{
host:'leader',
database:'test',
table:'lineitem',
renameMapping:fieldMap,
user:'user',
password:'password',
logFrequency:100000
});
Properties
The
LoadMatrix operator supports the following properties
Ports
The
LoadMatrix operator provides a single input port.
Using the MatrixSink Operator to Load Actian Matrix from SQL
The
MatrixSink operator is similar to the
LoadMatrix operator. Both the operators push data from a DataFlow workflow into a Matrix instance. The difference between the two operators is the invocation. The deployment and invocation of workflows having the MatrixSink operator is run directly from Matrix SQL using a user defined function (UDF).
Overview
Following is the MatrixSink operator framework:
1. You can create a workflow along with a cluster configuration, serialize it to JSON format, and store it in a table in the Matrix database. You can do this using the DataFlow export option in KNIME. For more information, see
Exporting an Actian DataFlow Workflow.
Note: Serialization creates a binary or textual representation of the specified in-memory object.
2. Run the invoke_dataflow UDF from the Matrix psql console or SQL APIs (JDBC) and enter ID of the serialized workflow as an argument. The invoke_dataflow UDF will retrieve the serialized workflow from the table and include the connection information for this Matrix instance into the serialized MatrixSink operator.
3. The invoke_dataflow UDF sends the configured serialized workflow to a REST service available on the DataFlow cluster manager.
4. The REST service deserializes the workflow and deploys it to the cluster.
5. When you run the MatrixSink operator on the cluster, the input data is moved to the Matrix instance (similar to LoadMatrix operator).
The following steps provide the utilization process of the MatrixSink operator:
1. Deploys the invoke_dataflow UDF to the Matrix instance.
2. Creates the workflow and REST service configuration tables in the Matrix database.
3. Populates the tables with JSON serialized workflows, cluster configuration, and REST service configuration.
4. Runs the invoke_dataflow UDF from the Matrix psql console or other APIs (JDBC).
For more information about the data transfer between the workflow and Matrix instance, see
Using the LoadMatrix Operator to Load Actian Matrix.
Before You Begin
• The invoke_dataflow UDF requires Actian Matrix version to be between 5.0.1 and 5.3.1.
• The REST service is included with the cluster manager for DataFlow 6.4 or higher.
Setting Up the Matrix Database
Installing the invoke_dataflow UDF
DataFlow distribution provides installation scripts for Matrix UDFs. After installing ODI-DataFlow-Matrix5-xy.rpm, the scripts are available in /opt/paraccel/bin location. For more information, see
Installing and Configuring DataFlow for Matrix On-Demand Integration (ODI).
After running the installation scripts , the invoke_dataflow UDF will be registered with the database.
Note: UDFs are installed for each database.
Creating the workflow and REST service configuration tables in the database
The invoke_dataflow UDF requires two user space tables. One table stores the connection information for the REST service(s) and the other table stores the actual serialized workflows and references the associated REST service entry. The schema is fixed for both the tables and the table names can be configured. The default table names are workflowconfigs and workflows.
Workflow REST service config table definition (SQL)
CREATE TABLE workflowconfigs(
name VARCHAR(128) PRIMARY KEY NOT NULL, -- unique name for this REST service configuration
url VARCHAR(1024) NOT NULL, -- URL for the REST service workflow submission
username VARCHAR(128) NOT NULL, -- DataFlow cluster manager user name
password VARCHAR(128) NOT NULL, -- DataFlow cluster manager password
startport INT NOT NULL, -- start of the port range used for DataFlow/Matrix internal communication
endport INT NOT NULL -- start of the port range used for DataFlow/Matrix internal communication
);
If both startport and endport are specified as 0, then the default range (40000–40999) is assumed. However, the port range can be overridden using the UDF parameters upon invocation.
Workflow table definition (SQL)
CREATE TABLE workflows(
name VARCHAR(128) PRIMARY KEY NOT NULL, -- unique name for this workflow
workflow VARCHAR(65535) NOT NULL, -- the JSON serialized workflow
config VARCHAR(128) REFERENCES workflowconfigs(name) NOT NULL -- reference to the workflow REST service config
);
Populating the Matrix Database Tables
Populating the REST service configuration table
A workflow REST service configuration entry has a unique name, the submission URL, user credentials for the DataFlow cluster manager running the service, and the port range. The submission URL pattern is /dispatcher/rest/executeWorkflow/submit.
Workflow REST service config entry example (SQL)
INSERT INTO workflowconfigs(name, url, username, password, startport, endport) VALUES(
'example cluster',
'http://foo.example:1100/dispatcher/rest/executeWorkflow/submit',
'root',
'changeit',
40000,
40999
);
Preparing a workflow
The input required for the workflows table is a
LogicalGraph instance and an
EngineConfig instance.
Important! The workflow must have only one instance of the MatrixSink operator.
Note: The EngineConfig instance will determine the cluster to deploy the workflow. This can be any DataFlow cluster and may not be the cluster with workflow deployed on the REST service. If the EngineConfig does not have the cluster configuration, then the workflow is run in plain, non-cluster mode on the REST service.
Populating the workflow table
A workflow entry has a unique name, the JSON serialized workflow with its cluster configuration settings, and a reference to the workflow REST service configuration to use.
The JSON serialized workflow can be obtained from wrapping a LogicalGraph and an EngineConfig in a
LogicalGraphWithConfig and passing it into
JSON serialization.
JSON serialization example (Java)
public String serialize(LogicalGraph graph, EngineConfig config) {
LogicalGraphWithConfig graphWithConfig = new LogicalGraphWithConfig(config, graph);
return new JSON().format(graphWithConfig);
}
The resulting serialized string is inserted into the workflows table.
Workflow insertion example (Java)
public int deployWorkflow(Connection conn, String name, String workflow, String configName) throws SQLException {
String sql = "INSERT INTO workflows (name, workflow, config) VALUES (?, ?, ?)";
PreparedStatement stat = conn.prepareStatement(sql);
stat.setString(1, name);
stat.setString(2, workflow);
stat.setString(3, configName);
return stat.executeUpdate();
}
You can also insert into the workflows table in SQL as well.
Workflow insertion example (SQL)
INSERT INTO workflows (name, workflow, config) VALUES (
'example workflow',
'{"engineConfig":{...},"logicalGraph":{...}}',
'example cluster'
);
However, the easiest way to populate the
workflows table is using the
Exporting an Actian DataFlow Workflow.
Invoking a Workflow from Matrix SQL
invoke_dataflow syntax (SQL)
SELECT * FROM invoke_dataflow (WITH
workflow('test_workflow') -- required, the ID of the workflow to be executed in the workflow table
target('testtable') -- required, an existing table on this DB specifying the expected data schema
workflow_table('myworkflows') -- optional, table containing the serialized workflows, default: 'workflows'
config_table('myconfigs') -- optional, table containing the REST service config, default: 'workflowconfigs'
start_port_range(42000) -- optional, start of the server socket port range, overrides spec from config table
end_port_range(42999) -- optional, end of the server socket port range, overrides spec from config table
);
The invoking will retrieve the entry with ID 'test_workflow' from the myworkflows table and send the serialized workflow to the REST service specified by the associated myconfigs table entry. The schema of the input to the MatrixSink operator should match the schema of the Matrix database table testtable. Both end points of the internal communication (the MatrixSink operator running in the workflow on the cluster and the invoke_dataflow UDF on the Matrix instance) use the specified port range when opening outbound listening sockets.
• A DataFlow record schema matches a Matrix table schema if all the table column names are present in the record schema and the resulting field/column pairs have compatible types.
• The Matrix column names are normalized to lowercase.
• Fields in the record schema that are not present in the table schema are ignored.
• The table specified by the target parameter is only used as a schema template. The input data is not stored automatically in this table. To move data into this or other table, use
SELECT * INTO <NewTable> FROM invoke_dataflow(WITH [...]);
or
INSERT INTO <ExistingTable> (<column_a> [,...]) SELECT <column_a> [,...] FROM invoke_dataflow (WITH [...]);
• The workflow_table and config_table parameters are optional, but tables for workflow and config must exist in the database—if the parameters are omitted, the tables are supposed to have the respective default names.
Troubleshooting
The following provides items to verify while troubleshooting:
• Error messages from the REST service is passed to the UDF and displayed on the Matrix psql console.
• Detailed UDF logging information is available in the STL_UDF_TRACE Matrix system table. For more information, see the Actian Matrix documentation.
• DataFlow cluster manager logs provides details.
• Ensure that the invoke_dataflow UDF is properly installed. For example, check '\df' from the psql console. For more information, see the Actian Matrix documentation.
• Ensure that the hostname of both Matrix host and DataFlow cluster host resolve to a publicly accessible IP or network interface.
• Ensure that the Matrix host can access the REST service host over the network and vice versa.
• Ensure that the Matrix host can be accessed over the network from the DataFlow cluster host and vice versa.
• Ensure that the workflow contains only one instance of the MatrixSink operator.
• Verify the settings in the workflow REST service config table.
Code Examples
The following code example provides using the MatrixSink operator in the Java API.
Using the MatrixSink operator in Java
MatrixSink matrixSink = graph.add(new MatrixSink());
The
MatrixSink operator does not depend on any defined configuration. All the settings entered by the
invoke_dataflow UDF when the workflow is deployed to the REST service.
Properties
Note: The
MatrixSink operator configurations cannot be defined. We recommend you use the default configurations. Any configuration that can be set on this operator is public only for implementation.
Ports
The
MatrixSink operator provides a single input port.
HBase Operators
Using the DeleteHBase Operator to Delete from HBase
The
DeleteHBase operator writes delete markers to a table in HBase.
A DeleteMarker can be mapped to a select HBase cell or it can be mapped to a family as a sub table:
1. mapCellMarker(java.lang.String, java.lang.String, com.pervasive.datarush.hbase.DeleteHBase.DeleteMarker)– Map a DeleteMarker to a select cell within a column family. A row key field is required to uniquely identify cells. A DeleteMarker will be inserted for each mapped cell qualifier, for each input record.
2. mapFamilyMarker(java.lang.String, com.pervasive.datarush.hbase.DeleteHBase.DeleteMarker)– Map a DeleteMarker to a column family. If mapping DeleteMarker.DeleteFamily, then only a row key field is required; otherwise both a row key field and a qualifier key field are required to uniquely identify cells. A single DeleteMarker will be inserted for each input record.
A time key field can optionally be specified to allow the user to provide a timestamp value as part of the input record. If a time key field is not specified, then each record will default to the current time. If mapping DeleteMarker.Delete to delete a specific version of a cell, then a time key field is required to uniquely identify cells. DeleteMarkers that delete past versions will use the time key field or default to current time.
The input will be repartitioned using HBase table region row key ranges. Each partition will sort its DeleteMarkers in row-key ascending, qualifier-key ascending, time-key descending order, and then write the DeleteMarkers to the appropriate regions.
If the specified HBase table does not exist, it will be created. The number of regions created will be MAX (4, the level of parallelism).
Code Example
The following example demonstrates using the
DeleteHBase operator to delete cells in a table in HBase.
Using the DeleteHBase Operator
DeleteHBase delete = graph.add(new DeleteHBase());
delete.setTableName("table");
delete.setRowFieldName("rowkey");
delete.mapCellMarker("family1", "qualifier1", DeleteMarker.DeleteColumn);
delete.mapCellMarker("family1", "qualifier2", DeleteMarker.DeleteColumn);
delete.mapCellMarker("family2", "qualifier1", DeleteMarker.DeleteColumn);
graph.connect(data.getOutput(), delete.getInput());
Using the DeleteHBase Operator in RushScript
var markerMap = {
'family1':{'qualifier1':'DeleteColumn', 'qualifier2':'DeleteColumn'},
'family2':{'qualifier1':'DeleteColumn'}
}
var data = dr.deleteHBase(
data,
{
tableName:'table',
rowFieldName:'rowKey',
cellMarkerMap:markerMap
});
Properties
The following table provides the properties supported by
DeleteHBase operator.
Ports
The
DeleteHBase operator provides a single input port.
Using the ReadHBase Operator to Read from HBase
The
ReadHBase operator is used to read a result set from a table in HBase. The result set is specified by various field mappings and can be filtered to only return a specific time range.
When running with parallelism enabled, each partition will read its assigned regions one region at a time in row-key order. This guarantees that records within the same region will not span a partition, and that each partition output is returned in row-key, qualifier-key order.
Specifying Field Mappings
Because of the nature of NoSQL databases , there may or may not be a particular schema that can be used across all the rows of a given table in HBase. Therefore, the field mappings when reading from HBase must be specified after adding the operator to a logical graph.
Within HBase a cell is uniquely identified by a tuple consisting of {row, column, timestamp}. The cell at a particular row and column with the latest timestamp is considered the latest version of a given cell. Since HBase stores multiple previous versions of a given cell, the default behavior of the operator returns only the latest version of any retrieved cell. Cell versions can be filtered by specifying the time range with the associated properties.
Because HBase by default stores all of its data as pure binary, the
ReadHBase operator will use a catalog table to keep track of the data types that the raw data should be converted into. The various conversion methods for specific DataFlow types are listed in the following table.
All other data types will be serialized or deserialized using the default DataFlow formats. If the entry for a particular table is missing in the catalog, the data will be returned as byte arrays. This allows the user to manually apply any required conversions.
Since HBase is a column-oriented database, the column portion of the index key consists of two parts: a column family and a column qualifier. The column family identifies one of several families that are determined when the table is initially created. Column families provide a way to logically and physically partition columns into groups such that the cells associated with a particular family are stored together in the same files on disk. The column qualifier uniquely identifies a cell and its previous versions within a column family.
DataFlow fields can be mapped to HBase cells in one of two ways:
• mapCell()—Map individual cells within a column family. Cells within a family can be of heterogeneous types and only the mapped cells are accessed. The mapped fields will be together in a single record. An optional row-key field can be specified to uniquely identify each DataFlow flow record.
• mapFamily()—Map all cells within a column family as a subtable. All cells within a family are of homogeneous types and all cells are accessed. Each cell within a family is contained in an individual record. Optional row-key, qualifier-key fields can be specified to uniquely identify each DataFlow flow record.
In both cases a cell can contain a single field or a record of fields. Mapping a single cell as a record of fields will allow multiple fields to be packed into a single cell thus greatly increasing I/O performance at the expense of reduced version granularity. All fields packed together in a single cell are versioned together and therefore all fields must be present when writing. The default DataFlow serialization is used exclusively in this case.
Using HCatalog
Field mappings can also be derived from a schema stored in HCatalog. If a table’s schema is stored in HCatalog, the
ReadHBase operator only needs to be directed to this HCatalog table; the HBase table name and field mappings are not required. Individual cells within a column family will be mapped, as defined in the HCatalog table mapping. The resulting DataFlow fields will have the same names as the fields in HCatalog.
To use HCatalog with the ReadHBase operator, specify the HCatalog database name and HCatalog table name where the schema is stored, rather than specifying an HBase table name and field mapping. Optionally, specify a list of fields, as named in HCatalog, to read from the table; if this list is not specified, all fields mapped in HCatalog will be read from the table.
Hadoop Configuration Properties
An HBase cluster has the following minimum connection configuration properties:
fs.default.name
The HDFS URL (such as hdfs://headnode:8020)
hbase.rootdir
The root directory where HBase data is stored (such as hdfs://headnode:8020/hbase)
hbase.zookeeper.quroum
The nodes running ZooKeeper
hbase.zookeeper.property.clientPort
The ZooKeeper client port.
Additionally, other properties may occasionally need to be defined:
hive.metastore.uris
The Hive Metastore URI, if using HCatalog (such as thrift://headnode:9083)
zookeeper.znode.parent
The parent znode in ZooKeeper used by HBase. This should be defined if the default value of /hbase is not being used by your cluster. (For some Hortonworks clusters, you may need to define this property with a value of /hbase-unsecure.)
Code Example
The following example demonstrates using the
ReadHBase operator to read an entire table in HBase. It uses the typical map cell as field mapping to read the table.
Using the ReadHBase Operator in Java
ReadHBase reader = graph.add(new ReadHBase());
reader.setTableName("HBaseTable");
reader.setRowFieldName("rowkey");
reader.mapCell("family1", "qualifier1", "data1");
reader.mapCell("family1", "qualifier2", "data2");
reader.mapCell("family2", "qualifier1", "data3");
graph.connect(reader.getOutput(), operator.getInput());
Using the ReadHBase Operator in RushScript
var data = dr.readHBase({tableName:'HBaseTable', rowFieldName:'rowkey'});
The following example demonstrates using the ReadHBase operator with HCatalog.
Using the ReadHBase Operator with HCatalog in Java
ReadHBase reader = graph.add(new ReadHBase());
reader.setHCatalogDatabase("HCatDatabase");
reader.setHCatalogTable("HCatTable");
graph.connect(reader.getOutput(), operator.getInput());
Using the ReadHBase Operator with HCatalog in RushScript
var data = dr.readHBase({hcatalogDatabase:'HCatDatabase', hcatalogTable:'HCatTable'});
Properties
The
ReadHBase operator supports the following properties.
Ports
The
ReadHBase operator provides a single output port.
Using the WriteHBase Operator to Write to HBase
The
WriteHBase operator is used to write a result set to a table in HBase. If the target table for the write does not exist, it will be automatically created and an entry added to the catalog table to keep track of the current table’s schema. The input set and how it should be mapped to a table in HBase is specified by various field mappings defined within the operator. For more information on how to specify the HBase operator’s field mappings, see the section on specifying field mappings in
Using the ReadHBase Operator to Read from HBase.
When writing to a table in HBase the default behavior will generate a unique binary row key for every inserted record. The user may optionally specify a row key input field within the data set, in which case the input will be repartitioned using HBase table region row-key ranges. Each partition will individually sort blocks of rows using the row and qualifier keys and region boundaries, and will write the rows to the appropriate regions. If the row key is not specified, the input will be written to regions local to the partition and no repartitioning will be performed. A row key input field must be one of the following types to allow serialization to be performed:
• TokenTypeConstant.LONG
• TokenTypeConstant.STRING
• TokenTypeConstant.BINARY
A unique qualifier key will also be generated for each record if the user maps a family as a subtable and does not specify a qualifier key input field. The qualifier key is generated in a similar manner as the row key, and has the same type restrictions.
A timestamp field can optionally be specified to allow the user to provide a timestamp value as part of the input record. If a timestamp field is not specified, then each record will default to the current time. In both cases the timestamp value is narrowed to millisecond resolution to match HBase and will be advanced slightly to uniquely identify cells with duplicate row/qualifier keys and timestamp values. This ensures the operator is tolerant of duplicate cell versions in the input stream as long as they occur infrequently. Importing large numbers of duplicate cell versions in a short amount of time (more than thousands of duplicates per second) may result in significant time skew to maintain uniqueness.
Using HCatalog
The
WriteHBase operator can use HCatalog in two ways.
If the target table already defined in HCatalog, similarly to the ReadHBase operator, no mapping needs to be defined; instead, it will be read from HCatalog. Define the HCatalog database and table name. Optionally, list the fields you would like to write to the table; if this property is not defined, all fields will be written.
If the target table has not yet been defined in HCatalog, a mapping will need to be provided. However, by providing an HCatalog database and table name, a new entry for this table will be created in HCatalog, which can be used for subsequent reads and writes of this table without manually specifying a mapping. If a list of fields is specified, only these fields will be added to the HCatalog schema; otherwise, all mapped fields are included.
The HCatalog field names will be matched to the DataFlow field names. These field names must be valid for HCatalog: containing only lowercase and numeric characters. If necessary,
Using the DeriveFields Operator to Compute New Fields can be used to rename input fields to match these requirements.
Hadoop Configuration Properties
An HBase cluster has the following minimum connection configuration properties:
fs.default.name
The HDFS URL (such as hdfs://headnode:8020)
hbase.rootdir
The root directory where HBase data is stored (such as hdfs://headnode:8020/hbase)
hbase.zookeeper.quroum
The nodes running ZooKeeper
hbase.zookeeper.property.clientPort
The ZooKeeper client port.
Additionally, other properties may occasionally need to be defined:
hive.metastore.uris
The Hive Metastore URI, if using HCatalog (such as thrift://headnode:9083)
zookeeper.znode.parent
The parent znode in ZooKeeper used by HBase. This should be defined if the default value of /hbase is not being used by your cluster. (For some Hortonworks clusters, you may need to define this property with a value of /hbase-unsecure.)
Code Example
The following example demonstrates using the
WriteHBase operator to write a table in HBase. It uses the typical map cell as field mapping to write the table.
Using the WriteHBase operator
WriteHBase writer = graph.add(new WriteHBase());
writer.setTableName("data");
writer.mapCell("family1", "qualifier1", "data1");
writer.mapCell("family1", "qualifier2", "data2");
writer.mapCell("family2", "qualifier1", "data3");
graph.connect(data.getOutput(), writer.getInput());
The following example demonstrates using the WriteHBase operator to write to a new HCatalog table.
Using the WriteHBase operator with a new HCatalog table
WriteHBase writer = graph.add(new WriteHBase());
writer.setTableName("data");
writer.mapCell("family1", "qualifier1", "data1");
writer.mapCell("family1", "qualifier2", "data2");
writer.mapCell("family2", "qualifier1", "data3");
writer.setHCatalogDatabase("hCatDatabase");
writer.setHCatalogTable("hCatTable");
graph.connect(data.getOutput(), writer.getInput());
The following example demonstrates using the WriteHBase operator to write to an existing HCatalog table.
Using the WriteHBase operator with an existing HCatalog table
WriteHBase writer = graph.add(new WriteHBase());
writer.setHCatalogDatabase("hCatDatabase");
writer.setHCatalogTable("hCatTable");
graph.connect(data.getOutput(), writer.getInput());
Properties
The
WriteHBase operator supports the following properties.
Ports
The
WriteHBase operator provides a single input port.