Partitioning Data
The DataFlow framework allows operators to specify their data needs through metadata. Metadata is used to set three aspects of data:
• Type: Record data is typed. Each field has a name and a data type.
• Ordering: Whether the data is sorted and, if so, by which fields and how.
• Distribution: How data is distributed. Data can be evenly distributed, hash partitioned, range partitioned, randomly distributed, fully distributed, or not distributed at all.
In many cases when building a DataFlow application, you will not have to directly set metadata. Operators that care about data ordering or distribution will set their metadata accordingly. However, you may run into a case where you want to explicitly partition data according to a key value so that downstream operators work properly. Conversely, you may also want to gather all streams of data back together into one for some reduction. The following sections provide information on how to explicitly affect metadata.
Using the PartitionHint Operator to Explicitly Partition Data
The
PartitionHint operator forces the input data to be partitioned into parallel streams of data for subsequent parallel operations. If requested, the output will also be sorted.
It is also possible to force data to be staged to disk, breaking physical streaming; consumers of the output will run in a physical graph executed after the physical graph executing this operator. Even if not forced, the (re)partitioning may cause staging to occur.
Usually these actions happen automatically in a logical graph as required, but this operator provides a mechanism for explicit control. For example, an application may be composed with operators that do not explicitly set their metadata, as they are normally distribution- and order-independent. In this particular case, explicitly setting the partitioning will yield the desired results. In this case, the
PartitionHint operator can be injected as needed to produce the necessary results.
Code Example
The following code fragment demonstrates using the
PartitionHint operator. The configuration specifies that the data will be hash partitioned by the field named "key". It also specifies that the data should be ordered (sorted) by the "key" and "value" fields. Operators connected downstream from the
PartitionHint operator will be able to take advantage of the data re-partitioning and re-ordering accordingly.
Using the PartitionHint operator in Java
// Create the partition hint operator
PartitionHint part = graph.add(new PartitionHint());
// Hash partition the source data by the field "key"
part.setPartitioning(KeyDrivenDataDistribution.hashed("key"));
// Order the data by "key" (descending) and "value" (ascending)
part.setDataOrdering(new DataOrdering(Arrays.asList(SortKey.desc("key"), SortKey.asc("value"))));
// Connect a source data port to the partition hint operator
graph.connect(source, part.getInput());
Properties
The
PartitionHint operator provides the following properties.
Ports
The
PartitionHint operator provides a single input port.
The
PartitionHint operator provides a single output port.
Using the GatherHint Operator to Force a Data Gather
The
GatherHint operator forces parallel streams of data to be gathered into a single nonparallel stream. If requested, the output will also be sorted.
It is also possible to force data to be staged to disk, breaking physical streaming; consumers of the output will run in a physical graph executed after the physical graph executing this operator. Even if not forced, the transition from parallel to non-parallel may cause staging to occur.
Usually these actions happen automatically in a logical graph as required, but this operator provides a mechanism for explicit control.
By definition, this is a nonparallel operator.
Code Example
Using the GatherHint operator in Java
// Create the gather hint operator in a graph
GatherHint gatherHint = graph.add(new GatherHint());
// Set the data ordering to fields: "key1" and "key2" (ascending). This is optional.
gatherHint.setDataOrdering(new DataOrdering(SortKey.asc("key1", "key2")));
// Force the data to be staged (optional).
gatherHint.setForceStaging(true);
// Connect data source to the gather hint operator
graph.connect(source, gatherHint.getInput());
Properties
The
GatherHint operator provides the following properties.
Ports
The
GatherHint operator provides a single input port.
The
GatherHint operator provides a single output port.
Using the Randomize Operator to Randomize Partitioning
The
Randomize operator can be used to reorder the input in a random fashion. The output is identical to the input but ordered randomly, with each record on a random partition.
Code Example
Using the Randomize operator in Java
// Create the randomize operator in a graph
Randomize randomize = graph.add(new Randomize());
// Set the random seed
randomize.setSeed(123456789);
// Connect data source to the randomize operator
graph.connect(source, randomize.getInput());
Properties
The
Randomize operator provides one property.
Ports
The
Randomize operator provides a single input port.
The
Randomize operator provides a single output port.