Building DataFlow Applications in Java : DataFlow Operator Library : Partitioning Data
 
Share this page                  
Partitioning Data
The DataFlow framework allows operators to specify their data needs through metadata. Metadata is used to set three aspects of data:
Type: Record data is typed. Each field has a name and a data type.
Ordering: Whether the data is sorted and, if so, by which fields and how.
Distribution: How data is distributed. Data can be evenly distributed, hash partitioned, range partitioned, randomly distributed, fully distributed, or not distributed at all.
In many cases when building a DataFlow application, you will not have to directly set metadata. Operators that care about data ordering or distribution will set their metadata accordingly. However, you may run into a case where you want to explicitly partition data according to a key value so that downstream operators work properly. Conversely, you may also want to gather all streams of data back together into one for some reduction. The following sections provide information on how to explicitly affect metadata.
Using the PartitionHint Operator to Explicitly Partition Data
Using the GatherHint Operator to Force a Data Gather
Using the Randomize Operator to Randomize Partitioning
Using the PartitionHint Operator to Explicitly Partition Data
The PartitionHint operator forces the input data to be partitioned into parallel streams of data for subsequent parallel operations. If requested, the output will also be sorted.
It is also possible to force data to be staged to disk, breaking physical streaming; consumers of the output will run in a physical graph executed after the physical graph executing this operator. Even if not forced, the (re)partitioning may cause staging to occur.
Usually these actions happen automatically in a logical graph as required, but this operator provides a mechanism for explicit control. For example, an application may be composed with operators that do not explicitly set their metadata, as they are normally distribution- and order-independent. In this particular case, explicitly setting the partitioning will yield the desired results. In this case, the PartitionHint operator can be injected as needed to produce the necessary results.
Code Example
The following code fragment demonstrates using the PartitionHint operator. The configuration specifies that the data will be hash partitioned by the field named "key". It also specifies that the data should be ordered (sorted) by the "key" and "value" fields. Operators connected downstream from the PartitionHint operator will be able to take advantage of the data re-partitioning and re-ordering accordingly.
Using the PartitionHint operator in Java
// Create the partition hint operator
PartitionHint part = graph.add(new PartitionHint());

// Hash partition the source data by the field "key"
part.setPartitioning(KeyDrivenDataDistribution.hashed("key"));

// Order the data by "key" (descending) and "value" (ascending)
part.setDataOrdering(new DataOrdering(Arrays.asList(SortKey.desc("key"), SortKey.asc("value"))));

// Connect a source data port to the partition hint operator
graph.connect(source, part.getInput());
Properties
The PartitionHint operator provides the following properties.
Name
Type
Description
dataOrdering
The data ordering of the output. The output is guaranteed to be ordered in the specified manner.
forceStaging
boolean
Indicates whether data must be staged to disk.
partitioning
The data distribution of the output. The output is guaranteed to be distributed in the specified manner.
Ports
The PartitionHint operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The data to explicitly partition.
The PartitionHint operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The partitioned data.
Using the GatherHint Operator to Force a Data Gather
The GatherHint operator forces parallel streams of data to be gathered into a single nonparallel stream. If requested, the output will also be sorted.
It is also possible to force data to be staged to disk, breaking physical streaming; consumers of the output will run in a physical graph executed after the physical graph executing this operator. Even if not forced, the transition from parallel to non-parallel may cause staging to occur.
Usually these actions happen automatically in a logical graph as required, but this operator provides a mechanism for explicit control.
By definition, this is a nonparallel operator.
Code Example
Using the GatherHint operator in Java
// Create the gather hint operator in a graph
GatherHint gatherHint = graph.add(new GatherHint());

// Set the data ordering to fields: "key1" and "key2" (ascending). This is optional.
gatherHint.setDataOrdering(new DataOrdering(SortKey.asc("key1", "key2")));

// Force the data to be staged (optional).
gatherHint.setForceStaging(true);

// Connect data source to the gather hint operator
graph.connect(source, gatherHint.getInput());
Properties
The GatherHint operator provides the following properties.
Name
Type
Description
dataOrdering
(Optional) The data ordering of the output. The output is guaranteed to be ordered in the specified manner. If not specified, the current data ordering will be maintained.
forceStaging
boolean
Indicates whether data must be staged to disk. Default: false.
Ports
The GatherHint operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Input data to be gathered and optionally reordered.
The GatherHint operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Resulting data gathered into a single stream and reordered as specified.
Using the Randomize Operator to Randomize Partitioning
The Randomize operator can be used to reorder the input in a random fashion. The output is identical to the input but ordered randomly, with each record on a random partition.
Code Example
Using the Randomize operator in Java
// Create the randomize operator in a graph
Randomize randomize = graph.add(new Randomize());

// Set the random seed
randomize.setSeed(123456789);

// Connect data source to the randomize operator
graph.connect(source, randomize.getInput());
Properties
The Randomize operator provides one property.
Name
Type
Description
seed
long
The random seed that will be used.
Ports
The Randomize operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Input data to be randomly ordered.
The Randomize operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Original data in a random order.