In the last few sessions we focused on operations like map, filter and flat map and we got a lot of intuition about how these sorts of operations are distributed over a cluster. So for example, how a distributed map or how a distributed filter is done. And also along the way we got a little bit of intuition about the anatomy of a Spark job, where these operations are actually running. But so far, we haven't focused on distributed reduction operations. In this session, we're going to focus on reduction operations in Spark. So far, we've defined distributed data parallelism, and we saw intuitively that Spark implements this model. And most importantly in the first few sessions I hope you got a feel for what latency really means to distributed system. Then we covered Spark's programming model. In the beginning, we learned that from 10,000 feet to really far above Spark looks like Scala collections. But internally, but doesn't behave very much like a Scala collections. In particular Spark uses laziness to save time and memory on the latency front. In that vein, we saw transformations and actions and we learned about caching and persistence. Or said another way, we learned how to selectively cache data in memory in order to save compute time. We also saw how the cluster's topology comes into the programming model. You can't forget where your code is running. So far, most of the intuitions that we've developed have focused on distributing transformations such as map, flatMap, and filter. And we spent a lot of time visualizing how transformations like map, flatMap and filter are parallelized and distributed. Though, if you notice, we didn't yet consider how actions are distributed. So think about operations like reduce. How are these actions distributed? They're very different from operations like map or filter. Okay, since they're so different, maybe we should first clarify what we mean by reduction operations. By reduction operations, I mean operations such as fold, reduce, or aggregate from Scala collections. All of these operations including their variance like fold left, or reduce right have something in common about how they're computed. They have some shared structure of computation. Can you remember what that is? So we can conceptually say that reduction operations walk through collections and combine neighboring elements together to produce a single combined result. Note that this is unlike map because map returns a new collection and reduction operation, essentially combines the elements of the collection to some kind of single value like an integer or a double or a string. So many of Spark's actions can be considered reduction operations, but some actions are not reduction operations. For example, saving things to a file, that's not a reduction operation. Yet it's still an action in Spark. So just to give you a more concrete example of what we mean by reduction operation, let's look at a simple illustration. Let's say we have the case class, Taco [LAUGH] with two fields. One named kind, which is a type String, and one named price, which is type Double. And let's say we have a list of instances of Taco and we'll name that list tacoOrder. So we have this list here of tacos. We can use a reduction operation to calculate the cost of the tacoOrder. In this case, we'll use a fold left operation, and we'll walk sequentially through the collection of tacos, accumulating and summing up the price of all the tacos starting with the initial price of the zeros. So starting with this 0.0 double here, we're going to accumulate up the prices and sum them up. And at the very end, we'll have the price of all these tacos summed up. This is very simple, and I hope you remember these kinds of operations from the last few Scala courses. My point in showing it to you is just to remind you of the difference between this kind of operation where the goal is to combine together the elements and to return a single result. With that of the transformation like operations we saw earlier, like map or filter, where we apply some kind of function to each element in the collection and we return a new collection. So now that you remember what I mean by reduction operation, let's recall parallel reduction operations from the parallel programming course. In that course, we learned about foldLeft and fold. Do you remember which of these two were parallelizable? I'll give you a moment to think about it. Try to visualize the two operations and think carefully about why the pattern of computation for one might be parallelizable, and why the other one might not be. You might remember that foldLeft is not parallelizable. According to the Scala API documentation, foldLeft applies a binary operator to a start value and all elements of the collection or iterator going from left to right. So this is the start value here. Notice here we have to execute things from left to right sequentially. And if you remember from the parallel programming course, Alex gave this really cool visualization with a LEGO block which shows how having these two types that you combine down to one type so AB to B, AB to B. This forces us to have to execute things sequentially. But for me, this isn't the clearest illustration, even visually, so let's step through a simple example to see why we can't parallelize foldLeft. In particular, this LEGO block visualization tries to show that if we have to change the result type from A to B, we have to execute things sequentially from left to right. So let's say we have a list with four integers in it, 1, 2, 3, and 4. And now we want to use foldLeft to walk through all the elements in the list and to combine them together into a single string. So at the end, what we want is a string, 1234. So just something like this. So let's step through this simple example and let's try to paralyze it by hand to understand why the types are stopping us from doing this in parallel. So remember that the idea in parallel programming was that we would be breaking these collections into pieces and individually working on each piece. So let's break this XS list into two pieces. And let's try to compute this foldLeft function on each individual piece in parallel and then let's combine them later. So let's start with the first element in our first chunk. We take the 0, the empty string. And then we concatenate it with the first element. This gives us 1. Then we do the same for the next element. Now we have 1, we add 2, now we have the string 12. And we can imagine that in parallel the same thing is happening here. So we start with our 0 and we take the first element which is 3, and we get the string 3. Same thing happens for the next element. We have now 3, we concatenate with 4 and now we 34. So these things can be done in parallel independently of one another. Now we have to combine them together. Notice our function is from type String, Int to String. But the types of these individual pieces now, if we want to try and combine them together, is String String. Which doesn't fit into this function any more. So now we would have a type error if we tried to combine these things. So I hope that illustrates what we mean when we say that because the type has to change we can't do this operation in parallel. Suddenly we can't combine these two individually computed pieces together any more, due to the types. So this is why foldLeft is not parallelizable. So that brings us to the fold operation. And as a reminder, fold is very similar to foldLeft, except with the requirement that we always have to return the same type. So remember in the signature of foldLeft, we had B, A here, now it's A, A. Everything is the same type. And if we recall our last example, if this list here was a list of strings instead of a list of integers, then I hope you can see how, due to the types being all the same. Where everything is a string, or taking strings and combining strings or returning strings, then it's easy to parallelize these operations. So said another way, the fold function enables us to parallelize things using a single function as a parameter, where the types are all the same and by enabling us to build parallelizable reduce trees. And if the LEGO visualization helps, here's how it would look to build one of these reduced trees. Here you can see how the tree could be broken up and worked on independently by different workers. So you can have somebody working on this part of the tree and somebody working on this part of the tree totally independently of one another. And then they can still combine all of the results together. So I hope you have some intuition now for the differences between fold and foldLeft and why it's easy to parallelize one operation but not easy to parallelize the other one. And that brings me to the aggregate operation. Does anybody remember what aggregate does? This is important, so I'll give you a second to try and remember. I'll give you a hint, though. As it sounds, it's similar to foldLeft or fold. However, the signature is a little different. Okay, so here's the signature of the aggregate method for regular scholar collections. Now we have three parameters in two parameter lists. Like before, we have a start value of type B, but now we have two functions instead of one. We have seqop and combop. The first one, seqop, represents a sequential operator, and like in foldLeft, it operates on two types. The second one is called combop, And it represents a combination operator. And like in regular fold, it only operates on one type, in this case B. So this is kind of like the function in foldLeft and this is kind of like the function in regular fold. While the signature might seem complicated, it's actually great for parallelizing things over a cluster. In fact, it's considered to be even more general than fold or foldLeft because it's both parallelizable and it makes it possible to change the return type to something else. So to just bring back the cool LEGO visualization, we can now visualize how it's possible to parallelize the aggregate operation. As you can see, what this does is it allows us to do the work of the foldLeft on separate chunks in parallels. So you can have individual workers doing this work. So think individual nodes or individual executors doing this work. Then we can change the types using this foldLeft sequential style reduction operator. But we can do it in parallel line chunks. And now that we have this combop function here, it makes it possible to build these nice reduced trees that are so easy to parallelize. Okay, great. Now that we have an intuition about why some operations can be done in parallel and why some can't be, and how to do everything we want to do like changing types and doing reduction computations in parallel. Though how does this relate to RDDs? How do we distribute these things? Well, let's start by looking at which of these reduction operations exist on regular Scala collections and on Spark. Of course, what we see here is that one operation we knew couldn't be parallelized simply doesn't even exist on RDDs. So sequential foldLeft and foldRight operators are simply not defined on RDDs at all. You have no choice to use them. This could be perhaps a little frustrating because foldLeft tends to be a pretty commonly used reduction operator when working with collections. It seems to be one of the more popular variants, and now you suddenly don't have access to it all. So what does this mean? Well, since in regular Scala collections our only choice for changing the return type of reduction operation was to use either the foldLeft or foldRight operation or aggregate. This means that now In Spark, since there is no foldLeft or foldRight operation, we can only use the aggregate operation if you have to change the return type of your reduction operation. So now you have no choice. You can't reach for this familiar foldLeft anymore. You have to use this weird aggregate thing that maybe you didn't use so much before in regular Scala collections. Though but you might be saying, hey, hold on a second. Why not still have a serial foldLeft or foldRight operation in Spark? It would mean a simpler and more convenient API for users to use, and it would be more in line with regular Scala collections, right? Well, it turns out that trying to do anything serially across a cluster is actually very difficult. Enforcing ordering in a distributed system is very hard and sometimes impossible. And in any case, it requires a lot of communication and synchronization between nodes, which we have learned throughout several of our sessions now is extremely costly. So it simply doesn't make sense to try and make sure that something happens before another thing in a distributed system. Which means it typically doesn't make a lot of sense to try and make available serial operations, like foldLeft on a cluster. In fact, I'll even argue that in Spark, the aggregate operation is actually a more desirable reduction operation a lot of the time. Why do you think that's the case? So remember, we're in distributed thinking now. We have big data sets. What is it about the jobs that you might want to do with Spark that would make an operation like aggregate so desirable to use? Think about it. As you'll get to know from the programming assignments and hopefully from real data analysis jobs. As you'll get to know from the programming assignments, sorry. And hopefully from real data analysis jobs you may one day do on real clusters with Spark, much of the time, when you have some large amounts of data, you typically have very complicated records or elements of your RDD. So the individual element types are very big, nested, complex things. They're often not simple integers or strings like we've been looking at. But instead, they're a really complex data type. And often, you don't need the whole thing to do the analysis that you may want to do. You may want to project down this really complicated data type into something simpler to actually work on. For example, if you wanted to do a computation involving the data stored on Wikipedia, you may have a data type called WikipediaPage which contains all kinds of fields. Probably more than what's listed here, but which could include things like the title of the article, the redirectTitle of the article, the timestamp of the article, the username of the last person to contribute to the article, and the actual text contained within the article. So you can also imagine many more possible fields, like a list of articles linked to in that Wikipedia article, etc. And you can image that to do some kind of interesting computation or some kind of interesting analysis on your Wikipedia data, you may only care about the title of the article and the timestamp. In this scenario if you had to keep the full text of every Wikipedia article around in every element of your RDD, it could waste a lot of time and a lot of memory to carry around this perhaps very big full text string around in your accumulator. This is why you might find that you would use the aggregate operation more often in Spark then you did in Scala collections. Because typically you want to get rid of stuff that you don't need and project down to a smaller data type.