ByteSource and ByteSink I/O Operators
The DataFlow operator library includes several pre-built Input/Output operators. This section covers ByteSource and ByteSink operators and provides details on how to use them. For more information, refer to the following topics:
About ByteSource Implementations
The
ByteSource interface defines a way to obtain binary data from data sources for downstream decoding, parsing, and data extraction. Separating this interface from the specific read operators allows readers to access source data in many different ways without needing to write a new operator for each possible access path.
File-based readers expose convenience methods for setting the
ByteSource to use as either a Path or a String containing the path to the files to access. Underneath, the Path and String implementations are converted into
ByteSource objects for use in the lower level I/O functions.
In general, the Path or String interfaces for the file-based read operators will provide the functionality needed. For example, wildcard characters can be used to specify a pattern of files to read. Likewise, passing in a directory implies all files in the directory should be read as one logical unit. However, sometimes it will be necessary to use a ByteSource directly. The
ByteSource implementations are covered in detail with usage examples in this section.
For more information about the ByteSource interface and the classes that implement it, see the
JavaDoc reference documentation.
BasicByteSource Operator
A
BasicByteSource represents reading data from a single Path that can be provided as either a Path or String parameter. If the Path is a directory, it represents reading from all files in the directory. These interfaces provide no more functionality than using a reader directly with a Path for the path name.
A
BasicByteSource will also detect compressed data automatically based on file suffixes, although additional constructors provide the ability to specifically set the compression format.
A constructor is also provided that allows specifying an action to take if an input file cannot be read. This is useful for situations where a directory path name is given and a subset of the files contained within it cannot be accessed for reading.
Following is an example of creating a byte source that supports compression (meaning the data being read is already compressed and should be decompressed during the reading process).
Using the BasicByteSource operator with Compression
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.io.compression.CompressionFormats;
import com.pervasive.datarush.operators.io.BasicByteSource;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Read a file that is compressed.
*
*/
public class ReadCompressedRatings {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadRatings");
// List the supported compression formats
System.out.println("supported formats: " + CompressionFormats.listSupportedFormats());
// Create the byte source for the compressed data
BasicByteSource source =
new BasicByteSource(
Paths.asPath("data/ratings.txt.gz"),
CompressionFormats.lookupFormat("gzip"));
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText());
reader.setSource(source);
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(
INT("userID"),
INT("movieID"),
INT("rating"),
STRING("timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Log results
LogRows logger = graph.add(new LogRows(1));
graph.connect(reader.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the previous example:
• The list of supported compression formats is printed out. Compression formats are extensible, so custom formats can be added as needed.
• A
BasicByteSource is constructed. In this case we need to provide a Path to the files to read and the compression format to use. The helper class
CompressionFormats is used to obtain the
CompressionFormat implementation for the "gzip" compression type. Since here the source name ends with ".gz", providing a format is not strictly required; the
ByteSource would automatically detect the format in this case.
• The created byte source is passed to the reader as the source property. The reader will use the byte source to obtain binary data for decoding, parsing, and data extraction. The data source will decompress the input data before passing it on to the reader. This allows the reader to be agnostic as to what pre-processing must take place on the data before data parsing is applied.
ConcatenatedByteSource Operator
A
ConcatenatedByteSource combines several
ByteSource objects together into a single
ByteSource. When read, the source produces the data from each underlying source in turn, resulting in a concatenation of the contents.
A typical usage is to read a provided list of files. Usually a list of files can be read by either reading all of the files in a directory or using wildcards in file names to read files that match a specific pattern. At times this functionality may not be sufficient. In this case, a list of files can be generated and used to construct a
ConcatenatedByteSource.
As with the
BasicByteSource, once constructed, the byte source is handed to a reader as the source property. The reader then consumes the bytes of the files from the source and processes them as needed.
Following is an example using a
ConcatenatedByteSource. The example is contrived because the files are within the same directory and could have been specified using a wildcard pattern in the file name.
Using the ConcatenatedByteSource operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.LONG;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.TIMESTAMP;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import java.util.Arrays;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Path;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.operators.io.ByteSource;
import com.pervasive.datarush.operators.io.ConcatenatedByteSource;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Read a list of files as one logical unit.
*
*/
public class ReadMultipleFiles {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
// Create the byte source for the list of files to read
Path[] paths = new Path[] {
Paths.asPath("data/logs1.txt"),
Paths.asPath("data/logs2.txt"),
Paths.asPath("data/logs3.txt"),
};
ByteSource source = ConcatenatedByteSource.getConcatenatedSource(Arrays.asList(paths));
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText());
reader.setSource(source);
reader.setFieldSeparator("|");
RecordTokenType type = record(
STRING("entityID"),
TIMESTAMP("timestamp"),
LONG("siteID"),
INT("compromised"),
LONG("eventID"));
reader.setSchema(TextRecord.convert(type));
// Log results
LogRows logger = graph.add(new LogRows(1));
graph.connect(reader.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the
ConcatenatedByteSource example:
• A list of Path objects is assembled representing the files to read. This list can be built applying whatever means necessary. The example is building a simple list of files that exist in the same directory. However, the list of files could be from different directories with different naming schemes. The data contained in the files must all be of the same expected format; if a header were present, it would need to appear in each file.
• The list of Path objects is converted into a
ByteSource using the static method getConcatenatedSource(). This method creates the
ConcatenatedByteSource from the list of Paths.
• The byte source is set as the source property on the reader.
GlobbingByteSource Operator
A
GlobbingByteSource applies a file globbing pattern to the file name within a file path. Globbing is the capability of using wildcard characters within a file name to match the pattern to files in an actual file system. After the globbing is accomplished, a
GlobbingByteSource acts similarly to a
ConcatenatedByteSource, using the list of filtered paths as the files to be read as one logical unit.
The interfaces into the file read operators allow setting the source as a String. When this interface is used, the given String may contain any supported wildcard characters. The readers use a GlobbingByteSource in that case to resolve the actual files to be read.
The only reason then to use a
GlobbingByteSource directly is to specify how to handle files that match the globbing pattern but are unreadable.
The followoing code example uses a GlobbingByteSource directly, setting the action to take when a source file is unreadable.
Using the GlobbingByteSource operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.LONG;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.TIMESTAMP;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.io.ByteSource;
import com.pervasive.datarush.operators.io.GlobbingByteSource;
import com.pervasive.datarush.operators.io.UnreadableSourceAction;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Read a list of files as one logical unit.
*
*/
public class ReadGlobbedFiles {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
// Create a globbing byte source using a simple wildcard
ByteSource source = new GlobbingByteSource("data/logs?.txt", UnreadableSourceAction.FAIL);
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText());
reader.setSource(source);
reader.setFieldSeparator("|");
RecordTokenType type = record(
STRING("entityID"),
TIMESTAMP("timestamp"),
LONG("siteID"),
INT("compromised"),
LONG("eventID"));
reader.setSchema(TextRecord.convert(type));
// Log results
LogRows logger = graph.add(new LogRows(1));
graph.connect(reader.getOutput(), logger.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the
GlobbingByteSource example:
• The
GlobbingByteSource is created with a file path containing a wildcard pattern for the file name. The action for unreadable source files is set to FAIL. Any source files that cannot be accessed using the file name pattern will cause an exception.
• The byte source is used as the source property for the reader. The reader will access all of the files matching the pattern as one logical unit of data.
About ByteSink Implementations
The
ByteSink interface defines a way to abstractly write byte data from a writer to a target file or set of files. Separating this interface from file-based writer operators allows writer implementations to target data without direct knowledge of where or how the data is being written.
File-based writers expose convenience methods for setting the
ByteSink to use either a Path or a String containing the path to the files to write. Underneath, the Path and String implementations are converted into ByteSink objects for use in the lower level I/O functions.
In general, the Path or String interfaces for the file-based write operators provide the functionality needed. For example, passing in a directory implies each active stream of data in the application should write a unique file in the directory. This allows parallel writing to take place.
The only time the need arises to use a
ByteSink directly is to compress the output data before writing. The BasicByteSink implementation is covered in detail below with usage examples.
For more information about the
ByteSink interface and the classes that implement it, see the
JavaDoc reference documentation.
BasicByteSink Operator
The
BasicByteSink implementation specifies the use and type of compression when writing the target files. The following code example shows how to create a
BasicByteSink specifying the type of compression to use. After the sink is created, it can be passed to a file write operator as the Sink property.
Using the BasicByteSink operator
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.STRING;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.io.Paths;
import com.pervasive.datarush.io.WriteMode;
import com.pervasive.datarush.io.compression.CompressionFormats;
import com.pervasive.datarush.operators.io.BasicByteSink;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.schema.TextRecord;
import com.pervasive.datarush.types.RecordTokenType;
/**
* Write the ratings file with gzip compression.
*/
public class WriteCompressedRatings {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ReadStaging");
// Create the reader using the byte source
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
RecordTokenType ratingsType = record(
INT("userID"),
INT("movieID"),
INT("rating"),
STRING("timestamp"));
reader.setSchema(TextRecord.convert(ratingsType));
// Write the ratings using compression
BasicByteSink sink =
new BasicByteSink(
Paths.asPath("results/ratings.txt.gz"),
CompressionFormats.lookupFormat("gzip"));
WriteDelimitedText writer = graph.add(new WriteDelimitedText(sink, WriteMode.OVERWRITE));
writer.setFieldSeparator("::");
writer.setHeader(true);
writer.setFieldDelimiter("");
writer.setWriteSingleSink(true); // Write a single output file (loss of parallelism)
// Connect reader and writer
graph.connect(reader.getOutput(), writer.getInput());
// Compile and run the graph
graph.run();
}
}
Notes about the example:
• A
BasicByteSink is constructed using a Path and the "gzip" compression format.
• The sink with compression enabled is passed as the Sink property to the writer’s constructor.
• When the graph is executed, the data will be written in delimited text format and compressed using "gzip" compression.