Was this helpful?
Distributed Execution
Let’s now examine the details of processing the data set. For simplicity, we’ll assume that parallelism is set to two so that we only need to track two data partitions. Also, we will cover only the distributed case, knowing that the pseudo-distributed case is similar as described in the previous section.
Phase One Processing
Output of the Reader
Let’s assume that the output of the reader operation is as follows:
Partition 1
Partition 2
City
Date
Low
High
City
Date
Low
High
Austin
2012-07-01
73
89
Austin
2012-07-02
76
96
Boston
2012-07-01
71
91
Boston
2012-07-02
70
82
San Francisco
2012-07-01
58
67
San Francisco
2012-07-02
57
66
Seattle
2012-07-01
56
67
Seattle
2012-07-02
55
67
Austin
2012-07-03
77
99
Austin
2012-07-04
75
99
Boston
2012-07-03
67
85
Boston
2012-07-04
67
84
San Francisco
2012-07-03
55
68
San Francisco
2012-07-04
54
72
Seattle
2012-07-03
53
65
Seattle
2012-07-04
53
68
Austin
2012-07-05
75
94
Austin
2012-07-06
73
96
Boston
2012-07-05
68
78
Boston
2012-07-06
69
84
San Francisco
2012-07-05
55
64
San Francisco
2012-07-06
55
67
Seattle
2012-07-05
52
74
Seattle
2012-07-06
55
77
Output of FilterRows
In this step, the operator would then output all rows from the previous step except for those from 7/05/2012 and 7/06/2012 according to the following configured filter:
date >= '2012-07-01' and date <= '2012-07-04'
The output would therefore be:
Partition 1
Partition 2
City
Date
Low
High
Differ‑ence
City
Date
Low
High
Differ‑ence
Austin
2012-07-01
73
89
16
Austin
2012-07-02
76
96
20
Boston
2012-07-01
71
91
20
Boston
2012-07-02
70
82
12
San Francisco
2012-07-01
58
67
9
San Francisco
2012-07-02
57
66
9
Seattle
2012-07-01
56
67
11
Seattle
2012-07-02
55
67
12
Austin
2012-07-03
77
99
22
Austin
2012-07-04
75
99
24
Boston
2012-07-03
67
85
18
Boston
2012-07-04
67
84
17
San Francisco
2012-07-03
55
68
13
San Francisco
2012-07-04
54
72
18
Seattle
2012-07-03
53
65
12
Seattle
2012-07-04
53
68
15
Output of DeriveFields
In this step we calculate a new difference column equal to the difference between high and low from the following configured derivation:
difference=high-low
The output step of this would be:
Partition 1
Partition 2
City
Date
Low
High
Differ‑ence
City
Date
Low
High
Differ‑ence
Austin
2012-07-01
73
89
16
Austin
2012-07-02
76
96
20
Boston
2012-07-01
71
91
20
Boston
2012-07-02
70
82
12
San Francisco
2012-07-01
58
67
9
San Francisco
2012-07-02
57
66
9
Seattle
2012-07-01
56
67
11
Seattle
2012-07-02
55
67
12
Austin
2012-07-03
77
99
22
Austin
2012-07-04
75
99
24
Boston
2012-07-03
67
85
18
Boston
2012-07-04
67
84
17
San Francisco
2012-07-03
55
68
13
San Francisco
2012-07-04
54
72
18
Seattle
2012-07-03
53
65
12
Seattle
2012-07-04
53
68
15
Output of Group(partial)
In this step, we calculate per-partition maximum values of the difference field that we calculated in the previous step. The output would thus be:
Partition 1
Partition 2
City
Max Difference
City
Max Difference
Austin
22
Austin
24
Boston
20
Boston
17
San Francisco
13
San Francisco
18
Seattle
12
Seattle
15
As you can see:
22 is the maximum value for difference for Austin in partition 1 because it is the maximum value between 16 and 22.
24 is the maximum value for difference for Austin in partition 2 because it is the maximum value between 20 and 24
After this step, data is staged to disk because this is the last step in this phase.
Phase Two Processing
Output of Redistribute
Phase 2 begins by redistributing the data that was staged in the previous phase. The Redistribute operator ensures that rows of the same key live on the same partition. The repartitioning is somewhat arbitrary as long as a city does not span partitions. For this example, we’ll assign Boston and Austin to partition 1 and Seattle and San Francisco to partition 2.
The output of Redistribute would then be as follows:
Partition 1
Partition 2
City
Max Difference
City
Max Difference
Austin
22
San Francisco
13
Austin
24
San Francisco
18
Boston
20
Seattle
12
Boston
17
Seattle
15
Output of Group(final)
Now that the data has been redistributed, we can compute the final results:
Partition 1
Partition 2
City
Max Difference
City
Max Difference
Austin
24
San Francisco
18
Boston
20
Seattle
15
As you can see:
24 is the overall max for Austin because it is the max of 22 and 24
20 is the overall max for Boston because it is the max of 20 and 17
Pipelining within Phases
Within a given phase, the various operations are pipelined, with the results of one operation being processed by the next in the chain as soon as they are available. For example, here is a snapshot during the execution of phase 1.
/download/attachments/20480340/image2012-8-10%2012%3A57%3A54.png?version=1&modificationDate=1405715103097&api=v2
The various operators behave as follows:
1. FilterRows: Reads the next row and either writes it to the output or skips it, depending on whether it matches the specified filter.
2. DeriveFields: Reads the next row and writes it to the output, appending the calculated value.
3. Group(partials): Reads the next row, while maintaining a running maximum value for all rows for a given city. Afterr all rows for a given city have been read, it can output the maximum for that city.
To illustrate this more fully, let’s examine partition 1 of phase 1 in more detail. Let’s assume for the sake of simplicity that the data is already sorted. Let’s also assume that the data only includes the rows for Austin and Boston:
Row
City
Date
Low
High
1
Austin
2012-07-01
73
89
2
Austin
2012-07-03
77
99
3
Austin
2012-07-05
75
94
4
Boston
2012-07-01
71
91
5
Boston
2012-07-03
67
85
6
Boston
2012-07-05
68
78
Pipeline processing proceeds as follows:
Time
Current output of Reader
Current output of FilterRows
Current output of DeriveFields
Running Max, tracked by Group (partials)
Current output of Group (partials)
1
Row - 1
City - Austin
Date - 2012-07-01
Low - 73
High - 89
 
 
 
 
2
Row - 2
City - Austin
Date - 2012-07-03
Low - 77
High - 99
Row - 1
City - Austin
Low - 73
High - 89
 
 
 
3
Row - 3
City - Austin
Date - 2012-07-05
Low - 75
High - 94
Row - 2
City - Austin
Low - 77
High - 99
Row - 1
City - Austin
Difference - 16
 
 
4
Row - 4
City - Boston
Date - 2012-07-01
Low - 71
High - 91
 
Row - 2
City - Austin
Difference - 22
City - Austin
Max Difference - 16
 
5
Row - 5
City - Boston
Date - 2012-07-03
Low - 67
High - 85
Row - 4
City - Boston
Low - 71
High - 91
 
City - Austin
Max Difference - 22
 
6
Row - 6
City - Boston
Date - 2012-07-05
Low - 68
High - 78
Row - 5
City - Boston
Low - 67
High - 85
Row - 4
City - Boston
Difference - 20
City - Austin
Max Difference - 22
 
7
 
 
Row - 5
City - Boston
Difference - 18
City - Boston
Max Difference - 20
City - Austin
Max Difference - 22
In the previous table, the various rows correspond to clock ticks. Each column represents a buffer in the pipeline. The columns are ordered by processing order in the pipeline so that as you move down the table, data “moves” from left to right as it would in the pipeline.
Specifically:
1. At time 1, the Delimited Text Reader reads row 1
2. At time 2:
a. FilterRows outputs row 1
b. The Delimited Text Reader reads row 2
3. At time 3:
a. DeriveFields processes row 1
b. FilterRows outputs row 2
c. The Delimited Text Reader reads row 3
4. At time 4:
a. Group sets row 1 (Austin, 16) as its running max
b. DeriveFields processes row 2
c. FilterRows skips row 3 (notice the gap in the output of FilterRows)
d. The Delimited Text Reader reads row 4
5. At time 5:
a. Group replaces row 1 (Austin, 16) with row 2 (Austin, 22) as its running max
b. FilterRows outputs row 4
c. The Delimited Text Reader reads row 5
6. At time 6:
a. DeriveFields processes row 4
b. FilterRows outputs row 5
c. The Delimited Text Reader reads row 6
7. At time 7:
a. Group sees a new city, so it outputs the current running max (Austin, 22) and sets a new one (Boston, 20)
b. DeriveFields processes row 5
c. FilterRows skips row 6
8. At time 8, Group sees that the end-of-input has been reached, so it outputs the current running max (Boston, 20)
Note:  This example of pipeline processing is a simplified model. In practice, outputs are transferred in batches rather than row-by-row. The example also assumes that each step in the pipeline takes exactly the same amount of time. Finally, the description of Group is simplified based on the assumption that the input is already sorted by city.
 
Last modified date: 01/06/2023