Write I/O Operators
The DataFlow operator library includes several pre-built Input/Output operators. This section covers the Write operators and provides details on how to use them. For more information, refer to the following topics:
WriteAvro Operator
The
WriteAvro operator writes a stream of records in
Apache Avro format. Avro is a commonly used binary format offering data compression and the ability to be parsed in parallel. The writer must provide an encoding schema which is serialized with the data, providing the reader with sufficient metadata to decode it.
The operator can be used to write data with a predefined schema or, if one is not provided, generate a schema automatically. When appending to an existing file, the schema encoded in the target is used, ignoring any that were set on the operator itself.
Input fields are mapped to fields in the output schema by name. Any input fields not mapped to an output field will be omitted from the output. Any output fields not mapped to an input field will be null-valued; if this is the case and the field schema does not allow null values—that is, it is not a UNION involving the NULL schema type, the writer will raise an error.
Input mappings must be to a compatible Avro schema type. An error will be raised if a field has an incompatible type. Furthermore, if a field schema does not allow null values, but the source field takes on a null value during execution—the writer will fail with an error. Valid mappings are described in the following table:
The remaining Avro schema types are handled as follows:
• RECORD can only be used as the top-level schema provided to the writer; nested records are not allowed. However, special record types representing the DataFlow DATE, TIME, and TIMESTAMP times are used in mapping generated schemas.
• UNION can be used only if one of the component schemas has a valid mapping.
• ARRAY and MAP are unsupported.
When no schema is provided, either from the target or on the operator, one will be generated based on the input data. The resulting schema will have the same field names and ordering as the input record.
Because DataFlow allows a greater range of field identifiers than Avro, a cleansing process must be applied to field names when generating a schema so that the resulting fields are valid. Names are cleansed by replacing any characters which are not alphanumeric or underscore ('_') with the underscore character. If the resulting name does not start with an underscore or alphabetic character, an underscore is prepended to the name. For example, a field named "field#1" is mapped to "field_1", a field named "#1 field" to "_1_field", and a field named "1st field" to "_1st_field". Should the cleansing process yield the same name for two different fields, schema generation will procude an error.
As all DataFlow values are nullable, the generated schema for each field is always a UNION involving NULL plus one other type as indicated by the source field type:
• DOUBLE, FLOAT, LONG, and INT fields are mapped to the primitive Avro types of the same name.
• STRING fields are mapped differently depending on whether they have a FieldDomain associated with them. If they have no domain, they are mapped to the STRING primitive type. If they have a domain, they are mapped to an Avro ENUM type with symbols matching the valid domain.
• BINARY fields are mapped to the BYTES primitive type.
• NUMERIC fields are mapped to the DOUBLE primitive type, with possible loss of precision.
• CHAR fields are mapped to the STRING primitive type.
• DATE fields are mapped to a nested RECORD type consisting of a single LONG field named epochDays.
• TIME fields are mapped to a nested RECORD type consisting of a single INT field named dayMillis.
• TIMESTAMP fields are mapped to a nested RECORD type consisting of three fields: a LONG field named epochSecs, an INT field named subsecNanos, and another INT field named offsetSecs.
As with all file-based write operators, the
WriteAvro operator will treat its sink as a directory by default. A file-per-application data stream will be output in this case. This can be overridden using the writeSingleSink property.
When writing Avro data, the metadata containing information about the ordering and partitioning of the data may optionally be saved with the data. This can provide a performance improvement when reading the data later since the data may not need to be re-sorted or partitioned again to ensure it meets various requirements of the graph.
Code Examples
The following code example demonstrates using the
WriteAvro operator to write a set of data files, generating a default schema.
Using the WriteAvro operator in Java
// Construct the writer with the path where the results are to be written.
// Use OVERWRITE mode indicating the output file should be overwritten if it exists already.
WriteAvro writer = graph.add(
new WriteAvro("results/test-ratings", WriteMode.OVERWRITE));
// Connect the output of the reader to the input of the writer
graph.connect(operator.getOutput(), writer.getInput());
The following example demonstrates using the operator in RushScript, providing the schema as a file (ratings.avrs) containing the JSON serialization of an Avro Schema object.
Using the WriteAvro operator in RushScript
// Note that this operator doesn't return anything
dr.writeAvro(data, {target:'results/test-ratings', mode:'OVERWRITE', schema:'ratings.avrs'});
Properties
The
WriteAvro operator supports the following properties:
Ports
The
WriteAvro operator provides a single input port.
WriteToJDBC Operator
In its simplest form the
WriteToJDBC operator writes records from an input port to a target table in a database using a JDBC driver. Data from the input data source will be added to the target table using database insert statements. Constraints may be applied by the database.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a jar file that contains the JDBC driver and its dependencies.
Note that the WriteToJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
• Within Java, create an
OperatorSettings instance with the maximum parallelism level set to the wanted level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added. See the code example below.
• Within RushScript, set the maxParallelism level property within the property settings of the operator instance. See the code example below.
To read from database tables using JDBC, see
ReadFromJDBC Operator.
Code Examples
The following example reads data from a local file and writes the data into a table in MySQL.
Using the WriteToJDBC Operator in Java to append data to a table
// Create an operator settings specifying the maximum number of parallel instances.
// This is optional. It limits the number of database connections made.
OperatorSettings settings = OperatorSettings.MAX_PARALLELISM(8);
// Create the JDBC writer
WriteToJDBC writer = graph.add(new WriteToJDBC(), settings);
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("test");
writer.setPassword("test");
// Create mapping to database columns
Map<String, String> mappings= new LinkedHashMap<String, String>();
mappings.put("r_user", "user");
mappings.put("r_movieid", "movieid");
mappings.put("r_rating", "rating");
mappings.put("r_timestamp", "timestamp");
writer.setRenameMapping(mappings);
// Have to specify the table name using the JDBC writer
writer.setTableName("ratings");
// Set the SQL statement use to initialize the target table (as needed).
writer.setInitializeTableSQL(
"create table if not exists ratings (r_user integer, r_movieid integer, r_rating integer, r_timestamp varchar(20))");
writer.setSqlWarningLimit(20);
Notes on the code example:
• The target table name is specified. When using
WriteToJDBC the table name property is required.
• The SQL statement used to initialize the target table is specified. The SQL syntax may vary by database vendor.
Using the WriteToJDBC Operator in RushScript to append data to a table
// The SQL statement used to initialize the table
var initSQL = 'create table if not exists ratings (r_user integer, r_movieid integer, r_rating integer, r_timestamp varchar(20))';
// The database field remapping to use
var mapping = {
"r_user":"user",
"r_movieid":"movieid",
"r_rating":"rating",
"r_timestamp":"timestamp"
};
// Create the write to JDBC operator and set properties
dr.writeToJDBC(data, {
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'test',
password:'test',
tableName:'ratings',
initializeTableSQL:initSQL,
renameMapping:mapping,
sqlWarningLimit:20},
maxParallelism:8); // optionally set the max number of parallel instances wanted
Properties
The
WriteToJDBC operator supports the following properties.
Ports
The
WriteToJDBC operator provides a single input port.
BinaryWriter Operator
The
BinaryWriter operator writes raw binary data to a filesystem—either single file or into multiple files. It writes the arbitrary data file as long as the original source filename is known.
The name of the file to be written must be provided to the writer including the original extension if applicable and the field that contains the raw binary data. If writing multiple files, a folder must be provided and the multiple property must be enabled. Additionally, the filename field must be specified when writing multiple files.
Using the BinaryWriter in Java for Multiple files
// Construct the writer with the path where the results are to be written.
BinaryWriter writer = graph.add(new BinaryWriter("results/files", WriteMode.OVERWRITE));
//Field name containing the raw binary data
writer.setBinaryField("data");
//Name of the field containing the output filenames to use when
//extracting the data.
writer.setFilenameField("fileName");
//Whether the data is written as a single file or multiple files.
writer.setMultiple(true);
Using the BinaryWriter in Java for Single file
BinaryWriter writer = graph.add(new BinaryWriter("results/ReadMe.txt", WriteMode.OVERWRITE));
writer.setBinaryField("data");
writer.setFilenameField(null);
writer.setMultiple(false);
Using the BinaryWriter in RushScript
// Rushscript for extracting multiple files
dr.binaryWriter(data, {target:'results/files',mode:WriteMode.OVERWRITE,multiple:(multiple=="true"),binaryField:"id",filenameField:"fileName"});
The following image shows the target for the sample application.
Properties
The
BinaryWriter operator supports the following properties.
Ports
The
BinaryWriter operator provides a single input port.
DeleteFromJDBC Operator
The
DeleteFromJDBC operator deletes rows from a target table using the records from an input port to determine which rows to delete. When deleting rows the key fields must be specified in the operator properties. The key field values will be used within the generated database delete statement to determine which rows qualify for deletion.
This operator provides an output port that will include the data used by the delete operation and an additional column containing the status codes returned by the database for each delete statement.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a jar file that contains the JDBC driver and its dependencies.
Note that the DeleteFromJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
• Within Java, create an
OperatorSettings instance with the maximum parallelism level set to the desired level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added.
• Within RushScript, set the maxParallelism level property within the property settings of the operator instance.
Code Examples
The following example is used to delete rows from a database table. The input data to the operator must contain the key values required to remove data from the target table. This example also demonstrates collecting the output so that the status codes can be recorded.
Using the DeleteFromJDBC operator in Java to delete rows from a table
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.jdbc.OutputMode;
import com.pervasive.datarush.operators.io.jdbc.DeleteFromJDBC;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
public class WriteMySQL {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("WriteDelete");
// Create a delimited text reader for the "ratings.txt" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(INT("r_user"), INT("r_movieid"), INT("r_rating"), STRING("r_timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Create JDBC writer
DeleteFromJDBC writer = graph.add(new DeleteFromJDBC());
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("root");
writer.setPassword("localadmin");
writer.setTableName("ratings");
// You have to specify the key field names.
writer.setKeyNames(new String[] {"r_user"});
writer.setSqlWarningLimit(20);
// Create a delimited text writer to collect the status codes
WriteDelimitedText logger = app.add(new WriteDelimitedText("ratings_delete_status.txt", WriteMode.OVERWRITE));
logger.setWriteSingleSink(true);
logger.setHeader(true);
// Connect operators
graph.connect(reader.getOutput(), writer.getInput());
graph.connect(writer.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Using the DeleteFromJDBC Operator in RushScript to delete data from a table
// Create the delete from JDBC operator and set properties
dr.deleteFromJDBC(data, {
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'root',
password:'localadmin',
tableName:'ratings',
keyNames:["r_user"],
sqlWarningLimit:20},
maxParallelism:8); // optionally set the max number of parallel instances wanted
Properties
The
DeleteFromJDBC operator supports the following properties.
Ports
The
DeleteFromJDBC operator provides a single input port.
The
DeleteFromJDBC operator provides a single output port.
UpdateInJDBC Operator
The
UpdateInJDBC operator updates rows in a target table using records from an input port to determine which columns to update. When updating rows the key fields must be specified in the operator properties. The key field values will be used within the generated database update statement to determine which rows qualify for an update. Additionally the update fields may be specified in the operator properties. If the fields to update are not set all non-key fields will be used for the update.
This operator provides an output port that will include the data used by the update operation and an additional column containing the status codes returned be the database for each update statement.
Initial and final SQL statements can be provided. The initial statement is run before any input data is handled. The final statement is executed after all input data has been processed.
The syntax of the supported SQL varies from database to database. Using standard SQL is generally a good practice to prevent errors when switching from one database to another.
The JDBC driver for the database being used must be in the Java classpath of the application using this operator. Databases generally provide a .jar file that contains the JDBC driver and its dependencies.
Note that the UpdateInJDBC operator operates in parallel by default. Each parallel instance will open a separate connection to the target database. To limit the number of database connections used by this operator, tune down the operator parallelism level.
• Within Java, create an
OperatorSettings instance with the maximum parallelism level set to the desired level. Pass the operator settings instance to the add() method of the composition context within which the operator is being added.
• Within RushScript, set the maxParallelism level property within the property settings of the operator instance.
Code Examples
The following example shows how to use the input data of the
UpdateInJDBC operator to update database rows. The "r_user" and "r_movieid" fields from the ratings data are used as keys. The rating value is updated. This example also demonstrates collecting the output so the status codes can be recorded.
Using the UpdateInJDBC operator to update database rows
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.jdbc.OutputMode;
import com.pervasive.datarush.operators.io.jdbc.UpdateInJDBC;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.record.SelectFields;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
public class UpdateMySQL {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("WriteMySQL");
// Create a delimited text reader for the "ratings.txt" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(INT("r_user"), INT("r_movieid"), INT("r_rating"), STRING("r_timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Create JDBC writer
UpdateInJDBC writer = graph.add(new UpdateInJDBC());
writer.setDriverName("com.mysql.jdbc.Driver");
writer.setUrl("jdbc:mysql://dbserver:3306/test");
writer.setUser("root");
writer.setPassword("localadmin");
writer.setTableName("ratings");
// Specify the key fields names used as keys in the SQL update commands.
writer.setKeyNames(new String[] {"r_user", "r_movieid"});
writer.setSqlWarningLimit(20);
// Specify the name of the field that should be updated
writer.setUpdateFields(new String[] {"r_rating"});
// Create a delimited text writer to collect the status codes
WriteDelimitedText logger = app.add(new WriteDelimitedText("ratings_update_status.txt", WriteMode.OVERWRITE));
logger.setWriteSingleSink(true);
logger.setHeader(true);
// Connect operators
graph.connect(reader.getOutput(), writer.getInput());
graph.connect(writer.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Using the UpdateInJDBC Operator in RushScript to update data in a table
// Create the update in JDBC operator and set properties
dr.updateInJDBC(data, {
driverName:'com.mysql.jdbc.Driver',
url:'jdbc:mysql://dbserver:3306/test',
user:'root',
password:'localadmin',
tableName:'ratings',
keyNames:["r_user", "r_movieid"],
updateFields:["r_rating"],
sqlWarningLimit:20},
maxParallelism:8); // optionally set the max number of parallel instances wanted
Properties
The
UpdateInJDBC operator supports the following properties.
Ports
The
UpdateInJDBC operator provides a single input port.
The
UpdateInJDBC operator provides a single output port.
WriteDelimitedText Operator
The
WriteDelimitedText operator writes a stream of records as delimited text. The writer performs no reordering or filtering of the fields. If this is desired, the stream should be preprocessed with the appropriate operator.
Delimited text supports up to three distinct user-defined sequences within a record, used to identify field boundaries:
• A field separator: found between individual fields. By default this is a comma character.
• A field start delimiter: marking the beginning of a field value. By default this is the double quote character.
• A field end delimiter: marking the end of a field value. By default this is the double quote character.
The field separator cannot be empty. The start and end delimiters can be the same value. They can also both (but not individually) be empty, signifying the absence of field delimiters. The writer will always delimit fields, regardless of whether the values require it. Should a delimited field need to contain the end delimiter, it is escaped from its normal interpretation by duplicating it. For instance, the value ab""c will be formatted as "ab"c".
The writer accepts a
RecordTextSchema to provide formatting information for fields, as well as header row information. It is not required to provide a schema; however, one can be generated from the input data type using default formatting based on the data type. The writer also supports a pluggable discovery mechanism for creating a schema based on the input type, should fine-grained dynamic control be required.
Any schema, supplied or discovered, must be compatible with the input to the reader. To be compatible, a schema must contain a field definition with an assignable type for each field named in the input. Fields present in the schema but not the input are permitted with the missing field assuming a null value.
Delimited text data may or may not have a header row. The header row is delimited as usual but contains the names of the fields in the data portion of the record. The writer will emit a header row if it should be present and the write is not appending to an existing file.
When writing delimited text data, the metadata containing information about the ordering and partitioning of the data may optionally be saved with the data. This can provide a performance improvement when reading the data later since the data may not need to be re-sorted or partitioned again to ensure it meets various requirements of the graph.
As with all file-based write operators, the
WriteDelimitedText operator will treat its sink as a directory by default. A file-per-application data stream will be output in this case. This can be overridden using the writeSingleSink property.
See
ReadDelimitedText Operator to read data in delimited text format.
Code Examples
The following code example demonstrates using the
WriteDelimitedText operator to write a set of data files.
Using the WriteDelimitedText operator in Java
// Construct the writer with the path where the results are to be written.
// Use OVERWRITE mode indicating the output file should be overwritten if it exists already.
WriteDelimitedText writer = graph.add(
new WriteDelimitedText("results/test-ratings", WriteMode.OVERWRITE));
// No start/end field delimiter wanted
writer.setFieldDelimiter("");
// Output a header row at the beginning of the file with field names
writer.setHeader(true);
// Connect the output of the reader to the input of the writer
graph.connect(operator.getOutput(), writer.getInput());
Using the WriteDelimitedText operator in RushScript
// Note that this operator doesn't return anything
dr.writeDelimitedText(data, {target:'results/test-ratings', mode:'OVERWRITE', fieldDelimiter:'', header:true});
Note that the writeSingleSink property was not set. The property defaults to false. Running this application will result in multiple files being written into the target directory. The files will be named "part%n" where %n is replaced with the partition number. The number of files written will correspond to the parallelism configured for the DataFlow engine.
The partial screen shot below shows the results directory that contains the output of the sample application. Eight part files were written since the engine parallelism level was set to eight. Writing files in this manner is optimal, as the individual data streams can be written in parallel.
If for some reason the data is needed in a single file, enable the writeSingleSink property. The following code fragment amends the example above to add the setting of this property. The output will be a single file containing all of the resultant data. Performance will be adversely affected as the data must be gathered into a single stream for writing.
Using a single sink
// Create a delimited text writer
WriteDelimitedText writer = graph.add(new WriteDelimitedText("results/test-ratings.txt", WriteMode.OVERWRITE));
writer.setFieldDelimiter("");
writer.setHeader(true);
writer.setWriteSingleSink(true);
Properties
The
WriteDelimitedText operator supports the following properties.
Ports
The
WriteDelimitedText operator provides a single input port.
WriteFixedText Operator
The
WriteFixedText operator writes a record dataflow as a text file of fixed-width records. The writer requires a schema (
FixedWidthTextRecord) to determine how field values are positioned and formatted. Fields are mapped by name; all fields defined in the schema must be present in the input (and vice versa), although they need not be in the same order. The order of fields in the formatted record is determined by the order in the supplied schema. Records in the file are separated by a non-empty, user-defined character sequence.
The record schema can be manually constructed using the API provided, though this metadata is often persisted externally. Support for Pervasive Data Integrator structured schema descriptors (.schema files) is provided by the
StructuredSchemaReader class.
See
ReadFixedText Operator to read fixed-width text data.
Code Examples
The following example uses the
WriteFixedText operator to write a file in fixed-width format.
Using the WriteFixedText operator in Java
// Define the schema for the account data. This schema can be used
// for reading and writing the account data.
FixedWidthTextRecord schema = new FixedWidthTextRecord(new TextConversionDefaults(StringConversion.NULLABLE_TRIMMED));
schema.defineField("accountNumber", new PaddedTextType(TextTypes.STRING, 9, ' ', Alignment.LEFT));
schema.defineField("name", new PaddedTextType(TextTypes.STRING, 21, ' ', Alignment.LEFT));
schema.defineField("companyName", new PaddedTextType(TextTypes.STRING, 31, ' ', Alignment.LEFT));
schema.defineField("address", new PaddedTextType(TextTypes.STRING, 35, ' ', Alignment.LEFT));
schema.defineField("city", new PaddedTextType(TextTypes.STRING, 16, ' ', Alignment.LEFT));
schema.defineField("state", new PaddedTextType(TextTypes.STRING, 2, ' ', Alignment.LEFT));
schema.defineField("zip", new PaddedTextType(TextTypes.STRING, 10, ' ', Alignment.LEFT));
schema.defineField("emailAddress", new PaddedTextType(TextTypes.STRING, 25, ' ', Alignment.LEFT));
schema.defineField("birthDate", new PaddedTextType(TextTypes.FORMATTED_DATE(new SimpleDateFormat("MM/dd/yyyy")), 10, ' ', Alignment.LEFT));
schema.defineField("accountCodes", new PaddedTextType(TextTypes.STRING, 11, ' ', Alignment.LEFT));
schema.defineField("standardPayment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
schema.defineField("payment", new PaddedTextType(TextTypes.JAVA_DOUBLE, 7, ' ', Alignment.LEFT));
schema.defineField("balance", new PaddedTextType(TextTypes.JAVA_DOUBLE, 6, ' ', Alignment.LEFT));
// Create fixed text writer
WriteFixedText writer = graph.add(new WriteFixedText("results/accounts-fixedwidth.txt", WriteMode.OVERWRITE));
writer.setSchema(schema);
writer.setWriteSingleSink(true);
Here is a fragment of the data output by the example
WriteFixedText operator.
01-000667George P Schell Market Place Products 334 Hilltop Dr Mentor OH44060-1930warmst864@aol.com 02/28/1971XA 101.0 100.0 15.89
01-002423Marc S Brittan Madson & Huth Communication Co 5653 S Blackstone Avenue, #3E Chicago IL60637-4596mapper@tcent.net 06/30/1975BA 144.0 144.0 449.92
01-006063Stephanie A Jernigan La Salle Clinic 77565 Lorain Akron OH44325-4002dram@akron.net 11/02/1941EB|CB 126.0 126.0 262.98
01-010474Ernie Esser Town & Country Electric Inc. 56 Pricewater Waltham MA2453 hazel@bentley.net 12/15/1962JA|RB 127.0 127.0 271.75
01-010852Robert A Jacoby Saturn of Baton Rouge 4001 Lafayette Baton Rouge LA70803-4918din33@norl.com 12/22/1985ED|EA|RB|KA142.0 150.0 423.01
01-011625James C Felli Bemiss Corp. 23A Carolina Park Circle Spartanburg SC29303-9398cadair@gw.com 02/21/1940SB 151.0 155.0 515.41
01-018448Alan W Neebe Georgia State Credit Union PO Box 159 Demorest GA30535-1177delores@truett.com 01/31/1960MA|ED|SB 113.0 120.0 131.89
01-018595Alexander Gose Office Support Services 436 Green Mountain Circle New Paltz NY12561-0023dams@matrix.net 06/19/1940EC 147.0 147.0 477.09
The following example demonstrates writing fixed-width data using RushScript.
Using the WriteFixedText operator in RushScript
var accountsFixedSchema = dr.schema({type:'FIXED'})
.nullable(true)
.trimmed(true)
.padChar(' ')
.alignment('LEFT')
.STRING("accountNumber", {size:9})
.STRING("clientName", {size:21})
.STRING("companyName", {size:31})
.STRING("streetAddress", {size:35})
.STRING("city", {size:16})
.STRING("state", {size:2})
.STRING("zip", {size:10})
.STRING("emailAddress", {size:25})
.DATE("birthDate", {pattern:'MM/dd/yyyy', size:10})
.STRING("accountCodes", {size:11})
.DOUBLE("standardPayment", {pattern:'0.00', size:6})
.DOUBLE("payment", {pattern:'0.00', size:7})
.DOUBLE("balance", {pattern:'0.00', size:6});
// Read fixed-width data data
dr.writeFixedText(
data,
{
target:'/path/to/file.txt',
mode:'OVERWRITE',
schema:accountsFixedSchema
});
Properties
The
WriteFixedText operator supports the following properties.
Ports
The
WriteFixedText operator provides a single input port.
WriteSink Operator
The
WriteSink operator writes a stream of records to a data sink. The sink accepts a sequence of bytes that represent formatted records, identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor, which must be provided.
This operator is low level, providing a generalized model for reading files in a distributed fashion. Typically,
WriteSink is not directly used in a graph. Instead, it is indirectly used through a composite operator such as one derived from
AbstractWriter, providing a more appropriate interface to the end user.
Parallelized writes are supported by creating multiple output sinks from the original target, called fragments, each representing the portion of data on one partition. In the case of a write to a file system, this would result in a directory of files. To write in parallel the target must support the concept of fragmenting. If a target does not support fragmenting the write is forced to be non-parallel.
A port is provided that signals completion of the writer. This can be used to express a dependency between operators based on the target having been successfully written.
The writer makes a best-effort attempt to validate the target before execution, but cannot always guarantee correctness depending on the nature of the target. This is done to try to prevent failures that may result in the loss of a significant amount of work, such as when misconfigured graphs execute in a late phase.
Tip... This is a low-level operator that typically is not directly used. It can be used with a custom data format. A custom data format may be needed to support a format not provided by the DataFlow library.
Code Example
This example code fragment demonstrates how to set up a writer for a generic file type.
Using the WriteSink operator
WriteSink writer = new WriteSink();
writer.setTarget(new BasicByteSink("filesink"));
writer.setMode(WriteMode.OVERWRITE);
writer.setFormat(new DelimitedTextFormat(TextRecord.convert(record(INT("intfield"),STRING("stringfield"))),
new FieldDelimiterSettings(),
new CharsetEncoding()));
Properties
The
WriteSink operator supports the following properties.
Ports
The
WriteSink operator provides a single input port.
WriteStagingDataset Operator
The
WriteStagingDataset operator writes a sequence of records to disk in an internal format for staged data. Staged data sets are useful as they are more efficient than text files, being stored in a compact binary format. If a set of data must be read multiple times, significant savings can be achieved by converting it into a staging data set first.
It is generally best to perform parallel writes, creating multiple files. This allows reads to be parallelized effectively, as the staging format is not splittable.
There are several use cases for directly using staging data sets. A few examples are:
• A particular task to be accomplished must be broken into several DataFlow applications. Data from each application must be passed to the next. Staging data sets can be used to capture the intermediate data, allowing for quick and easy access.
• Data within a database is being used to build a predictive model. This development will require many iterations of algorithm testing with the data.
For this use case, the most efficient route is to first read the data from the source database and write it to staging data sets. Then use the staged data as input into the predictive analytic development. The one-time read from the database is overhead. However, given that the source data will be read many times, the savings will be substantial.
To read data in the staging data set format, see
ReadStagingDataset Operator.
Code Examples
The following code example reads from a delimited text file and writes the data to a staging data set.
Using the WriteStagingDataset operator in Java
// Write to a staging dataset overwriting existing content
WriteStagingDataset datasetWriter = graph.add(
new WriteStagingDataset("results/ratings-stage", WriteMode.OVERWRITE));
Using the WriteStagingDataset operator in RushScript
// Note: no return value
dr.writeStagingDataset(data, {target:'results/ratings-stage', mode:'OVERWRITE'});
The following partial screen shot shows the results directory that contains the output of the sample application. Eight part files were written since the engine parallelism level was set to eight. Writing files in this manner is optimal, as the individual data streams can be written in parallel. This is recommended for staging data sets because the format is not splittable. Having a single file output limits the parallelism when reading.
Properties
The
WriteStagingDataset operator supports the following properties.
Ports
The
WriteStagingDataset operator provides a single input port.
WriteARFF Operator
The
WriteARFF operator writes files using the Attribute-Relation File Format (ARFF). ARFF supports both sparse and dense formats. The format to use is specified with the format mode property. The default mode is sparse. When the input data is highly sparse, the sparse mode can save space since it only writes non-zero values. However, if the data is not very sparse, using sparse mode can actually cause the resultant file to be larger. Use dense mode for dense data.
Note that when writing sparse data that contains enumerated types, the zero ordinal category will always be treated as sparse and so will not appear in the output file. This is normal and expected behavior. The enumerated type is captured in the metadata. When the file is read, the zero ordinal values will be restored as expected.
ARFF provides a metadata section at the start of the file. The metadata may contain several items:
• Comments: free-form, user-supplied comments.
• Relation name: a user name given to the relation contained within the ARFF file.
• Attribute definitions: defines the field name and type of each data field. The attribute definitions define the schema of the file. The definitions are generated by the operator.
ARFF is useful when the input data is sparse. Since ARFF includes metadata such as the file schema, it is also very useful and convenient to use. The dense format is a simple CSV format and as such is very similar to the format written by the WriteDelimitedText operator.
ARFF data can be read
ReadARFF Operator.
Code Examples
The following example writes an ARFF file in sparse mode.
Using the WriteARFF operator in Java
// Write to an ARFF file in sparse mode
WriteARFF arffWriter = graph.add(new WriteARFF("results/ratings-arff.txt", WriteMode.OVERWRITE));
arffWriter.setFormatMode(ARFFMode.SPARSE);
arffWriter.setRelationName("Movie Ratings");
arffWriter.addComment("This is an ARFF file written using DataFlow");
arffWriter.setWriteSingleSink(true);
Notes from the example:
• The ARFF mode is set to sparse. This enables writing the data in sparse format. Any numeric data values equal to zero will not be written to the output, thereby saving space. When the data is sparse, using the sparse format is very efficient.
• The relation name is set. This is optional. The relation name is part of the ARFF metadata. It is used by consumers of ARFF files to distinguish the data set.
• A comment is added. This is also optional. Comments are added at the beginning of the ARFF metadata section. When reading an ARFF file the comments can also be retrieved. There is no practical limit to the number of comment lines that may be added.
• The writeSingleSink property is enabled. This option forces the writing of a single output file. The default is to write multiple files in parallel.
Using the WriteARFF operator in RushScript
dr.writeARFF(data, {
target:'results/ratings-arff.txt',
mode:'OVERWRITE',
formatMode:'SPARSE',
relationName:'Movie Ratings',
comments:'This is an ARFF file written using DataFlow',
writeSingleSink:'true'});
The following is the first few lines of the ARFF file written with the code example. The metadata appears at the top of the file. Metadata statements begin with the "@" character. Comments begin with the "%" character. The data follows after the DATA statement. Data rows in sparse format are wrapped with curly braces. Data values are comma-separated. Each data value begins with the field index followed by the data value.
% This is an ARFF file written using DataFlow
@relation 'Movie Ratings'
@attribute userID integer % int32
@attribute movieID integer % int32
@attribute rating integer % int32
@attribute timestamp string
@data
{0 1,1 1193,2 5,3 978300760}
{0 1,1 661,2 3,3 978302109}
{0 1,1 914,2 3,3 978301968}
{0 1,1 3408,2 4,3 978300275}
{0 1,1 2355,2 5,3 978824291}
{0 1,1 1197,2 3,3 978302268}
{0 1,1 1287,2 5,3 978302039}
{0 1,1 2804,2 5,3 978300719}
{0 1,1 594,2 4,3 978302268}
Properties
The
WriteARFF operator supports the following properties.
Ports
The
WriteARFF operator provides a single input port.
ForceRecordStaging Operator
Using the
ForceRecordStaging operator forces the operators on the input and output sides of the operator to execute sequentially, instead of concurrently. In a dataflow graph, consumers and producers run simultaneously, exchanging data as the operators are ready. This permits the graph to exploit pipeline parallelism. The data is staged to disk in configured temporary storage spaces.
In some cases, it may be necessary to avoid this concurrency. The simplest example is:
• Operator A produces data.
• Operator B consumes data from A, producing data only when all data from A has been consumed. (Calculating a total, for instance.)
• Operator C consumes the output of both A and B, but does not consume any data from A until after consuming data from B.
In this case, a memory usage dilemma, known as queue expansion, arises. B needs all of A's data, but C does not read anything from A until B produces some data. As the data unread by C cannot be discarded, it must be saved until it is read. To minimize memory requirements, the lag between readers is limited. This limit is raised when required, but in this case, it requires the entire output of A, which could easily exceed memory.
Usually, this is automatically handled by the framework, saving data to disk. This is done in a optimistic fashion, avoiding disk I/O and associated performance degradation until necessary at the cost of a (small) potential risk of graph failure due to memory being exhausted. This operator provides a pessimistic approach to the problem that forces all data to disk, with all the associated performance costs.
Tip... Use the ForceRecordStaging operator directly in situations where it is known that queue expansion will occur. Direct usage prevents the overhead of handling queue expansion.
Code Example
The code example provides a common use case requiring use of the
ForceRecordStaging operator. A data source is aggregated using the
Group operator. Aggregation causes a reduction of the input data. The output of
Group is then wired into a
Join operator. This allows joining the aggregated results back to the original data. However, only a few key groups are known to exist. This implies the join will consume large amounts of the original source while waiting for data from the group. Forcing the data to be staged prevents queue expansion and the overhead of handling the expansion at run time.
Note that the example is not complete: code to produce the source is not provided, and properties are not set on the group and join operators.
Using the ForceRecordStaging operator
// Aggregate the source data
Group group = graph.add(new Group());
graph.connect(source, group.getInput());
// Force staging of source data to prevent queue expansion.
// The group operator reduces the input row count considerably.
ForceRecordStaging forceStaging = graph.add(new ForceRecordStaging());
graph.connect(source, forceStaging.getInput());
// Join the original data to the aggregated results. Use the staged data
// instead of the original source to prevent queue expansion.
Join join = graph.add(new Join());
graph.connect(forceStaging.getOutput(), group.getOutput());
Using the ForceRecordStaging operator in RushScript
// Force a dataset to be staged
var stagedData = dr.forceRecordStaging(data);
Properties
The
ForceRecordStaging operator has no properties.
Ports
The
ForceRecordStaging operator provides a single input port.
The
ForceRecordStaging operator provides a single output port.
WriteORC Operator
The
WriteORC operator writes data file in
Apache Optimized Row Columnar (ORC) format. The ORC format is supported by Apache Hive.
The operator is registered as
writeORC for using as JSON or scripting. This operator is based on
AbstractWriter operator and supports the features from the base class, particularly in parallel writing. Each par file created in this way is a logical ORC file. Similar to
ReadORC operator, the WriteORC operator can access Hive libraries using the shim layer. The operator uses the
ORCFile or Writer API.
Note: The operator requires a Hadoop module configuration to be enabled even if the workflow does not run on the cluster or access HDFS.
For yarn jobs, the configuration property job.master.resources.memory must be set to a minimum of 256 MB. The default value is 64 MB. It is available under Container Configuration in the Cluster Administration portal.
For more information about ORC format, go to the
ORC file format manual.
A few DataFlow data types such as CHAR and TIME are not supported for writing because these do not have a matching ORC type. If a schema containing the unsupported data types is found, then the WriteORC operator will throw an exception at the time of compiling a graph.
In general, DataFlow types are assigned to the ORC schema types as mentioned in the following table.
Properties
The
WriteORC operator provides the following properties that is derived directly from
WriterOptions of the ORC.
Note: Before running the workflow, ensure that the client configuration and the jar files are added to the classpath. For more information, see
Integrating DataFlow with Hadoop.
Code Examples
The following example provides writing the ORC file data using Java.
Using the WriteORC operator in Java
WriteORC writer = graph.add(new WriteORC("hdfs://localhost:9000/dforc.orc"));
writer.setMode(WriteMode.OVERWRITE);
writer.setCompression(ORCCompression.SNAPPY);
writer.setBlockPadding(true);
The following example provides writing the ORC file data using RushScript.
Using the WriteORC operator in RushScript
var writer = dr.writeORC("orc-out", source, {target:"file:///data/dforc.orc", writeSingleSink:true});
Port
The
WriteORC operator provides a single input port.