Was this helpful?
Data Merging Operators
DataFlow supports multiple ways of merging data sets together. The most commonly used is the standard relational join. This is similar to a SQL join statement in that it joins two data sets based on key fields. Four join modes are supported: inner, full outer, left outer, and right outer. See the section on the standard relational join for more information.
A non-key-based join, also known as a cross join, produces a full Cartesian product of the input data sets. Unlike a standard relational join, no key fields need to be specified. The resultant data will join each row in the left side of the data with every row in the right side of the data. This results in an exponential increase of the input data and so should be used with caution.
In certain conditions data sets need to be merged together into a single set without using a relational or cross-product join. This can be accomplished by the union of the two data sets. This is similar to the SQL union statement. See the section on data unions for more information.
For more information, refer to the following topics:
Join Operator
The Join operator performs a relational equi-join on two input data sets by a specified set of keys. The four standard join modes are supported. See the documentation on the joinMode property below for more information on join modes.
The join can also be optionally constrained by a predicate expression. When a predicate expression is used with join, rows will only match if their key field values are equal and the given predicate expression returns true. This allows the join user to specify flexible constraints on how to join the input data.
The predicate expression can involve fields from both input data sets. Be aware that the predicate expression is run against the data within the namespace of the joined data. At this point the fields from the left side and right side data set are merged together and any name conflicts have been resolved. Predicate expressions for joins are optional. If a predicate expression is not specified, the join condition is the default: match on key field values.
Depending on the value of the hashJoinHint property, one of two procedures is used:
1. If hashJoinHint is false, input data will be sorted and hash partitioned by the specified keys (if not already sorted and partitioned). Once sorted and partitioned, data is is then combined in a streaming fashion. Note that if a key group contains multiple rows on both sides, then all combinations of left and right rows in the key group appear in the output for that key group. In this case, memory consumption is lower if the side with more key repetitions is linked to the left input.
2. If hashJoinHint is true, a full copy of the data from the right will be distributed to the cluster and loaded into memory within each node in the cluster. The left side will not be sorted or partitioned. When enabling the hashJoinHint, the right side data should be relatively small since hashjoin’s memory consumption increases with the size of the right data set. This can lead to out-of-memory errors if there is not enough memory available on the partition to keep the entire right side's hash table in memory during the join.
Tips:
When using a very small right data set with a hash join it may improve performance if parallelism is disabled for the reader on the right side if no other data manipulations are being performed.
Depending on the memory used during the hash joins on the cluster, it may be useful to adjust the Garbage Collector settings on the worker JVMs to avoid the out-of-memory errors when it is caused occasionally. Use the option -XX:+UseG1GC to improve the performance in a few cases. If out of memory errors occur frequently with the hash joins, you must increase the memory available on the workers or reduce the size of the data read on the right side.
Note:  hashJoinHint is currently ignored if performing a full outer or right outer join.
Code Example
The following example performs a join between two data sets. One contains user ratings of movies and the other contains movie information.
The application reads the input data, joins the data using the "movieID" field, and then writes the results. A hash join is used by enabling the hashJoinHint property. This implies the movie information data set will be loaded into memory to speed the join processing. Because a left outer join is being used, all of the rows of user ratings will be included in the output. User ratings without a corresponding movie record will have null values for all movie data set fields.
Using the Join operator in Java
// Join two datasets by the "movieID" field
Join joiner = graph.add(new Join());
joiner.setJoinKeys(JoinKey.keys("movieID"));    // Join on movieID field
joiner.setJoinMode(JoinMode.LEFT_OUTER);        // Use a left outer join
joiner.setUseHashJoinHint(true);                // Enable hash join
joiner.setMergeLeftAndRightKeys(true);          // Only output one set of key values
Using the Join operator in RushScript
// Two data sources have to be passed into the join operator
var joinedData = dr.join(leftData, rightData, {joinKeys:'movieID', joinMode:'LEFT_OUTER', useHashJoinHint:true, mergeLeftAndRightKeys:true});
The following Java code snippet can be used to set a predicate expression that will be used during the join processing. Rows will only be joined if their key field values match and the predicate evaluates to true.
Using a Predicate Expression in Join
// Use the SQL-like predicate syntax for specifying the join condition
joiner.setJoinCondition("rating = 5 and genre like \"Drama\"");
Properties
The Join operator has the following properties.
Name
Type
Description
joinCondition
If non-null, the condition is evaluated for every possible matching row. Only those rows for which the keys match and the condition returns true will be output as an inner join. When determining the additional set of rows to output for an outer join, we output the set for which the join condition never evaluated to true. Note that, if specified, the left fields and right fields must be distinct from each other. This is because the function will be evaluated in a namespace containing all fields from the left and right.
useHashJoinHint
boolean
Controls whether to perform the join as a hash join. If so, all rows on the right side will be loaded into memory and joined with the left. This is an optimization for the case where the right side is small enough to fit into memory since it implies that the left side need not be sorted or repartitioned.
joinKeys
List of keys to use for the join. Each element of the array contains the pair (leftName, rightName), where leftName is the name of the field on the left and rightName is the name of the field to join on the right.
mergeLeftAndRightKeys
boolean
Controls whether to merge the left and right key fields into a single field for output. If true, output keys will be named after the left. If false, left and right keys will be output independently. Default: false.
joinMode
Specifies how the operator should handle rows with unmatched keys. These are rows from one of the inputs of the join that do not join with any row from the other input. Such rows may be dropped from the join, or retained by padding with nulls to populate fields where data is not available. Possible values are:
INNER(default): Performs an inner join: discard input rows with unmatched keys.
FULL_OUTER: Performs a full outer join: retains rows with unmatched keys.
LEFT_OUTER: Performs a left outer join: retains rows from the left input with unmatched keys, but discards such rows from the right input.
RIGHT_OUTER: Performs a right outer join: retains rows from the right input with unmatched keys, but discards such rows from the left input.
Ports
The Join operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeft()
The left side of the join.
right
getRight()
The right side of the join. This should generally be connected to the smaller of the two inputs.
The Join operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The joined data.
CrossJoin Operator
The CrossJoin operator produces the Cartesian product of two sets of records. The output is typed using the merge of the two input types; if the right type contains a field already named in the left type, it will be renamed to avoid collision.
To generate the output record pairs, the right side data is temporarily stored to disk for multiple passes. The left side data is read into memory in chunks, which are then used to generate a full set of pairs with the iterable right side data. This process is repeated until the left side data is exhausted.
No guarantee is made in respect to the ordering of the output data.
Caution!  The CrossJoin operator produces a product of its two input data sets. Even with fairly small data sets the amount of output can be large. Use with caution.
Code Example
The following code example reads user ratings and movie listings. The two data sets are joined using the CrossJoin operator. The results are written to a local file.
Using the CrossJoin operator in Java
// Use CrossJoin to create the Cartesian product
CrossJoin joiner = graph.add(new CrossJoin());
joiner.setBufferRows(100000);                   // Specify the buffer size in rows
Using the CrossJoin operator in RushScript
var joinedData = dr.crossJoin(leftData, rightData, {bufferRows:100000});
Properties
The CrossJoin operator has the following property.
Name
Type
Description
bufferRows
int
The size (in rows) of the memory buffer used to cache input rows for join processing. Larger values can increase performance due to decreased intermediate file buffering. Default: 10,000 rows.
Ports
The CrossJoin operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeftInput()
The left side of the join.
right
getRightInput()
The right side of the join. This should generally be connected to the smaller of the two inputs.
The CrossJoin operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The joined data.
UnionAll Operator
The UnionAll operator provides an unordered union of two input data sources. The output will contain all of the rows of both input data sets. This is similar to the UNION ALL construct available in standard SQL. The input data is usually consumed as it becomes available and as such, order of the output data is non-deterministic. However, if the data order specified in the metadata of both inputs match, then the operator will perform a sorted merge preserving the sort order of the input. The metadata specifying the data order can be set through Sort Operator or AssertSorted Operator.
The type of the output is determined by setting the union mode. If the outputMapping setting is set to MAPBYSCHEMA, then a schema must be provided that will define the output type. Otherwise the output type can be automatically determined by setting MAPBYPOSITION or MAPBYNAME, which will determine an appropriate output type by mapping the two inputs by position or name, respectively.
In the case where a target schema is provided, the input fields are mapped to output fields based on field name. If a field in the output is not contained in the target schema, the field is dropped. If a field is contained in the output schema, but not in the input, the output field will contain NULL values. Input values are converted into the specified output field type where possible.
For example, if the left input contains fields {a:int, b:int, c:string} and the right input contains fields {a:long, b:double, d:string} and the target schema specifies fields {a:double, c:string, d:date, e:string} then the following is true:
The output port schema will contain the fields {a:double, c:string, d:date, e:string}.
Field b is ignored from both inputs.
When obtaining data from the left input port, the output fields {d, e} will contain null values.
When obtaining data from the right input port, the output fields {c, e} will contain null values.
Field a from the left input will be converted from an integer type to a double type.
Field a from the right input will be converted from a long type to a double type.
Field d from the right input will be converted from a string type to a date type. The format pattern specified in the target schema will be used for the conversion, if specified.
In the case where a target schema is not provided, the two input ports must have compatible types. In this case the operator will try to determine valid output fields based on the left and right input and two settings.
The outputMapping setting can be set to MAPBYNAME if the left and right side should be matched by field name. Otherwise the operator can use MAPBYPOSITION and they will be matched by position.
Additionally the includeExtraFields setting can be set to true if fields that are only present on one side of the input should be retained. If the field is not present in one of the inputs it will contain NULL values. If extra fields are present in one of the inputs and includeExtraFields is set to false, the extra fields will be dropped.
Code Examples
The following example demonstrates creating a UnionAll operator within a logical graph. The outputs from two separate operators are connected to the left and right inputs of the union operator. The creation and the graph and the source operators are not shown. The output of the union operator can be accessed using the getOutput() method.
Using the UnionAll operator in Java
// Use UnionAll to create the union of the two datasets
UnionAll unionOp = graph.add(new UnionAll());
graph.connect(source1.getOutput(), unionOp.getLeft());
graph.connect(source2.getOutput(), unionOp.getRight());
The following example demonstrates using the UnionAll operator within RushScript. This example is setting the schema property of UnionAll to coerce the two inputs into the provided target schema.
Using the UnionAll operator in RushScript
// Define a schema that is usable by the union operator
var irisschema = dr.schema()
    .nullable(true)
    .trimmed(true)
    .nullIndicator("?")
    .DOUBLE("sepal length")
    .DOUBLE("sepal width")
    .DOUBLE("petal length")
    .DOUBLE("petal width")
    .STRING("class", "Iris-setosa", "Iris-versicolor", "Iris-virginica");
 
// Apply union to two data sets using the defined schema.
var results = dr.unionAll(data1, data2, {schema:irisschema});
Properties
The UnionAll operator supports the following properties.
Name
Type
Description
includeExtraFields
boolean
Whether or not fields that are only present on one side of the input will be retained when automatically determining the output through MAPBYPOSITION or MAPBYNAME mode. Default: false.
mapByName
UnionMode
Determines the output type of the operator. Can be set to MAPBYPOSITION, MAPBYNAME, or MAPBYSCHEMA. If the first two options are used, the output type will be automatically determined by mapping the fields in the two inputs positionally or by name, respectively. Otherwise if MAPBYSCHEMA is used the schema must be set. Default: MAPBYPOSITION.
schema
RecordTextSchema
The target schema that defines the type of the output port of the union operator. When provided, the input data will be converted into the form specified by this schema. This is an optional property that uses MAPBYSCHEMA mode.
Ports
The UnionAll operator provides the following input ports.
Name
Type
Get Method
Description
left
getLeft()
The left side of the union.
right
getRight()
The right side of the union.
The UnionAll operator provides a single output port. The output port will contain all rows from the two inputs. The schema of the output port will match the provided target schema. If one is not provided, the output port type will match the type of the left hand side input port.
Name
Type
Get Method
Description
output
getOutput()
The joined data.
Last modified date: 03/10/2025