Data Merging Operators
DataFlow supports multiple ways of merging data sets together. The most commonly used is the standard relational join. This is similar to a SQL join statement in that it joins two data sets based on key fields. Four join modes are supported: inner, full outer, left outer, and right outer. See the section on the standard relational join for more information.
A non-key-based join, also known as a cross join, produces a full Cartesian product of the input data sets. Unlike a standard relational join, no key fields need to be specified. The resultant data will join each row in the left side of the data with every row in the right side of the data. This results in an exponential increase of the input data and so should be used with caution.
In certain conditions data sets need to be merged together into a single set without using a relational or cross-product join. This can be accomplished by the union of the two data sets. This is similar to the SQL union statement. See the section on data unions for more information.
For more information, refer to the following topics:
Join Operator
The
Join operator performs a relational equi-join on two input data sets by a specified set of keys. The four standard join modes are supported. See the documentation on the joinMode property below for more information on join modes.
The join can also be optionally constrained by a predicate expression. When a predicate expression is used with join, rows will only match if their key field values are equal and the given predicate expression returns true. This allows the join user to specify flexible constraints on how to join the input data.
The predicate expression can involve fields from both input data sets. Be aware that the predicate expression is run against the data within the namespace of the joined data. At this point the fields from the left side and right side data set are merged together and any name conflicts have been resolved. Predicate expressions for joins are optional. If a predicate expression is not specified, the join condition is the default: match on key field values.
Depending on the value of the hashJoinHint property, one of two procedures is used:
1. If hashJoinHint is false, input data will be sorted and hash partitioned by the specified keys (if not already sorted and partitioned). Once sorted and partitioned, data is is then combined in a streaming fashion. Note that if a key group contains multiple rows on both sides, then all combinations of left and right rows in the key group appear in the output for that key group. In this case, memory consumption is lower if the side with more key repetitions is linked to the left input.
2. If hashJoinHint is true, a full copy of the data from the right will be distributed to the cluster and loaded into memory within each node in the cluster. The left side will not be sorted or partitioned. When enabling the hashJoinHint, the right side data should be relatively small since hashjoin’s memory consumption increases with the size of the right data set. This can lead to out-of-memory errors if there is not enough memory available on the partition to keep the entire right side's hash table in memory during the join.
Tips:
• When using a very small right data set with a hash join it may improve performance if parallelism is disabled for the reader on the right side if no other data manipulations are being performed.
• Depending on the memory used during the hash joins on the cluster, it may be useful to adjust the Garbage Collector settings on the worker JVMs to avoid the out-of-memory errors when it is caused occasionally. Use the option -XX:+UseG1GC to improve the performance in a few cases. If out of memory errors occur frequently with the hash joins, you must increase the memory available on the workers or reduce the size of the data read on the right side.
Note: hashJoinHint is currently ignored if performing a full outer or right outer join.
Code Example
The following example performs a join between two data sets. One contains user ratings of movies and the other contains movie information.
The application reads the input data, joins the data using the "movieID" field, and then writes the results. A hash join is used by enabling the hashJoinHint property. This implies the movie information data set will be loaded into memory to speed the join processing. Because a left outer join is being used, all of the rows of user ratings will be included in the output. User ratings without a corresponding movie record will have null values for all movie data set fields.
Using the Join operator in Java
// Join two datasets by the "movieID" field
Join joiner = graph.add(new Join());
joiner.setJoinKeys(JoinKey.keys("movieID")); // Join on movieID field
joiner.setJoinMode(JoinMode.LEFT_OUTER); // Use a left outer join
joiner.setUseHashJoinHint(true); // Enable hash join
joiner.setMergeLeftAndRightKeys(true); // Only output one set of key values
Using the Join operator in RushScript
// Two data sources have to be passed into the join operator
var joinedData = dr.join(leftData, rightData, {joinKeys:'movieID', joinMode:'LEFT_OUTER', useHashJoinHint:true, mergeLeftAndRightKeys:true});
The following Java code snippet can be used to set a predicate expression that will be used during the join processing. Rows will only be joined if their key field values match and the predicate evaluates to true.
Using a Predicate Expression in Join
// Use the SQL-like predicate syntax for specifying the join condition
joiner.setJoinCondition("rating = 5 and genre like \"Drama\"");
Properties
The
Join operator has the following properties.
Ports
The
Join operator provides the following input ports.
The
Join operator provides a single output port.
CrossJoin Operator
The
CrossJoin operator produces the Cartesian product of two sets of records. The output is typed using the merge of the two input types; if the right type contains a field already named in the left type, it will be renamed to avoid collision.
To generate the output record pairs, the right side data is temporarily stored to disk for multiple passes. The left side data is read into memory in chunks, which are then used to generate a full set of pairs with the iterable right side data. This process is repeated until the left side data is exhausted.
No guarantee is made in respect to the ordering of the output data.
Caution! The CrossJoin operator produces a product of its two input data sets. Even with fairly small data sets the amount of output can be large. Use with caution.
Code Example
The following code example reads user ratings and movie listings. The two data sets are joined using the
CrossJoin operator. The results are written to a local file.
Using the CrossJoin operator in Java
// Use CrossJoin to create the Cartesian product
CrossJoin joiner = graph.add(new CrossJoin());
joiner.setBufferRows(100000); // Specify the buffer size in rows
Using the CrossJoin operator in RushScript
var joinedData = dr.crossJoin(leftData, rightData, {bufferRows:100000});
Properties
The
CrossJoin operator has the following property.
Ports
The
CrossJoin operator provides the following input ports.
The
CrossJoin operator provides a single output port.
UnionAll Operator
The
UnionAll operator provides an unordered union of two input data sources. The output will contain all of the rows of both input data sets. This is similar to the UNION ALL construct available in standard SQL. The input data is usually consumed as it becomes available and as such, order of the output data is non-deterministic. However, if the data order specified in the metadata of both inputs match, then the operator will perform a sorted merge preserving the sort order of the input. The metadata specifying the data order can be set through
Sort Operator or
AssertSorted Operator.
The type of the output is determined by setting the union mode. If the outputMapping setting is set to MAPBYSCHEMA, then a schema must be provided that will define the output type. Otherwise the output type can be automatically determined by setting MAPBYPOSITION or MAPBYNAME, which will determine an appropriate output type by mapping the two inputs by position or name, respectively.
In the case where a target schema is provided, the input fields are mapped to output fields based on field name. If a field in the output is not contained in the target schema, the field is dropped. If a field is contained in the output schema, but not in the input, the output field will contain NULL values. Input values are converted into the specified output field type where possible.
For example, if the left input contains fields {a:int, b:int, c:string} and the right input contains fields {a:long, b:double, d:string} and the target schema specifies fields {a:double, c:string, d:date, e:string} then the following is true:
• The output port schema will contain the fields {a:double, c:string, d:date, e:string}.
• Field b is ignored from both inputs.
• When obtaining data from the left input port, the output fields {d, e} will contain null values.
• When obtaining data from the right input port, the output fields {c, e} will contain null values.
• Field a from the left input will be converted from an integer type to a double type.
• Field a from the right input will be converted from a long type to a double type.
• Field d from the right input will be converted from a string type to a date type. The format pattern specified in the target schema will be used for the conversion, if specified.
In the case where a target schema is not provided, the two input ports must have compatible types. In this case the operator will try to determine valid output fields based on the left and right input and two settings.
The outputMapping setting can be set to MAPBYNAME if the left and right side should be matched by field name. Otherwise the operator can use MAPBYPOSITION and they will be matched by position.
Additionally the includeExtraFields setting can be set to true if fields that are only present on one side of the input should be retained. If the field is not present in one of the inputs it will contain NULL values. If extra fields are present in one of the inputs and includeExtraFields is set to false, the extra fields will be dropped.
Code Examples
The following example demonstrates creating a UnionAll operator within a logical graph. The outputs from two separate operators are connected to the left and right inputs of the union operator. The creation and the graph and the source operators are not shown. The output of the union operator can be accessed using the getOutput() method.
Using the UnionAll operator in Java
// Use UnionAll to create the union of the two datasets
UnionAll unionOp = graph.add(new UnionAll());
graph.connect(source1.getOutput(), unionOp.getLeft());
graph.connect(source2.getOutput(), unionOp.getRight());
The following example demonstrates using the UnionAll operator within RushScript. This example is setting the schema property of UnionAll to coerce the two inputs into the provided target schema.
Using the UnionAll operator in RushScript
// Define a schema that is usable by the union operator
var irisschema = dr.schema()
.nullable(true)
.trimmed(true)
.nullIndicator("?")
.DOUBLE("sepal length")
.DOUBLE("sepal width")
.DOUBLE("petal length")
.DOUBLE("petal width")
.STRING("class", "Iris-setosa", "Iris-versicolor", "Iris-virginica");
// Apply union to two data sets using the defined schema.
var results = dr.unionAll(data1, data2, {schema:irisschema});
Properties
The
UnionAll operator supports the following properties.
Ports
The
UnionAll operator provides the following input ports.
The
UnionAll operator provides a single output port. The output port will contain all rows from the two inputs. The schema of the output port will match the provided target schema. If one is not provided, the output port type will match the type of the left hand side input port.