Before we get into Spark SQL, which is the topic of this week, I first want to talk to you about structure and optimization, to sort of motivate Spark SQL. So before I say anything further about what I mean by structure and optimization, let's start with an example. So let's imagine that we're an organization called Code Award, offering scholarships to programmers who have overcome some kind of adversity. Let's say we have the following two datasets, where each one of these case classes will represent some kind of information about a person. So the demographic case class, we have information like an ID number, we have a person's age, whether or not, maybe, they attended some kind of coding bootcamp. We have the country that they live in or are from. We have information like whether or not they're some kind of minority, or they have served in a military capacity somehow. And then we have other financial information, like whether or not this person has any sort of debt, whether or not this person has financial dependents like children or anybody else that they have to financially support, or student loans, and finally, income of this person. So we have these two different datasets full of these two different pieces of information about different people. One is called demographics which is a pair RDD, where the ID is the key and the value is an instance of this demographic type. And on the other hand, the same is true for finances, where we have a pair RDD, where the key is an ID number and again, we have an instance of this finances thing as the value. Okay, so these are our two data sets. And let's assume that our data set includes information about people from many countries, with many life and financial backgrounds. So given that, let's imagine that our goal is to count up and select students for a specific scholarship. So, of course, every scholarship has some kind of criteria, so what is our criteria? Let's say that we have, for example, grant money given to us by the Swiss government so that we want to target Swiss students. And we want to target people who are potentially in need financially. Maybe they have some kind of debt, or they have many financial dependents. So, as a first step, we want to have a look at our data set and see how many people might be eligible by this criteria. So first, let's count up the people in our dataset who are both Swiss and who have some kind of debt with also financial dependents. So assuming that we just want to count the number of people in our data set that match this criteria, how could we implement this spark program? So, again, remember that we have two paired RDDs available to us. One called demographics and one called finances. With ID numbers and instances of demographic and finances as the values, okay? So, how do we implement this Spark program? I'll give you a moment to try and come up an example solution. Well, the short answer is that there are many ways to implement this program. Since we know we need to combine both the demographic and financial information, we need to do an inter-join. Because we need both demographic and financial information for a given person. The return type of this join will be another pair RDD with key integer, which represents the ID, and then a pair of demographic and finances. Once we've done that join, we can then filter out the people who meet our criteria. So in the first element that we have in the resulting pair of our join, which is of type demographic, we can check whether or not this person is from Switzerland. And then, we can check in the second part of our resulting value pair whether or not this person has financial dependents, and whether or not they have debt. If all of these conditions are met, then we can pass that through the filter, and then, finally, we can count it all up at the end. So, just to summarize, in this solution we do an inner join first, then we filter to select the people in Switzerland, and then we filter to select people with debt and financial dependents. So we do an inner join first, and then we filter afterwards, and then we count it up at the end. So another possibility is to do our filters first. We could take our two data sets, finances and demographics, and then filter them down with the conditions that we already know we have. And once we've done that filtering, we can then join the filtered results and count them up. So in the finances data set, we pass through people who have both financial dependents and debt. And in the demographics data set, we pass people who are from Switzerland. And once we've filtered those two data sets down, then we do our join and then we count it up. So just to summarize, we filter down the dataset first. We look only at people with debt and financial dependents. Then we filter to select the people in Switzerland. And finally, we do an inner join on this smaller, filtered down data set. So the assumption here is that the dataset should now be a little bit smaller when you join on a smaller dataset. And yet another possibility is to start with a Cartesian product between demographics and, oops, this should be finances. This is finances, here. So we start with the Cartesian product and then we pass through our filter pairs that have the same ids. And then we finally, we filter out the people from who are from Switzerland and people who have both financial dependents and some kind of debt as well, and then we count it up at the end. So this is yet another possibility that would give us the same answer, so again, this is finances, there's a typo here. So again, to go through the steps, we start by computing the Cartesian product in both data sets. Then we filter to select the results of the Cartesian that had the same IDs, so this is equivalent to an inner join. And finally, we filter out the people from Switzerland who have debt and financial dependents. So the bottom line here is that, for all three of these examples, the end result is always going to be the same. However, the big difference is the time that it takes to compute the job. So while the resulting value might be the same, the execution time is vastly different. Here are the running times of the first two possibilities, on a data set of roughly 150,000 people. In the first possible solution, where we did the join first followed by the filters afterwards, this computation took 4.97 seconds, so this blue bar here. Whereas in the second possible solution, we filtered down the data set first before computing the join. It only took 1.35 seconds, so that's here. So the bottom line is that the running times of these two different possible solutions is pretty different. Filtering the data set first results in a run time that's 3.6 times faster than if we did the join first and the filter second. But that doesn't even compare to the possible solution where we computed the Cartesian product first. In this case, it took approximately 4 minutes to complete our computation versus 1.35 seconds, or 4.97 seconds for the other two possible solutions. So this is 177 times slower than if we both did an inner join instead of a Cartesian product. This is kind of common sense, but the bottom line here is to show that you can get the same result while having vastly different running times. Since a recurring theme of the course so far has been that we have to carefully think about what our Spark jobs are doing, I would hope that you wouldn't think to use a Cartesian product first. However, it is conceivable that you might have found yourself in a situation where you lost a factor 3 or 4x of performance by choosing the join first before the filter. Many people would think to join first and then filter down second. Whereas, upon retrospect, it does seem more sensible to filter first and then join afterwards. Wouldn't it be nice, though, if Spark automatically knew, if we wrote the code in possibility 3, which was the Cartesian product code, or even if we wrote the code in possibility 1, that it could rewrite our code to be possibility number 2. So, Spark would just automatically know that it would be smarter to do a filter before the join, rather than the filter after the join. It would be great if Spark was smart enough to do that. What I hope to convey over the next few sessions is that given a bit of extra structural information, Spark is actually able to do many of these optimizations for you, and that all comes in the form of Spark SQL. So the key here is that structural information will enable optimizations. And in order to get a little bit of intuition about how that might be the case, we've got to start by looking at what I mean by structure. So to start, all data is not created equal in a structural sense. Data falls on some kind of spectrum between unstructured and structured. But what does structured mean? Well, structured is something like a database table, like SQL or something that is very rigid with a fixed schema. On the other far end of the scale is unstructured data. So things like dumps of log files, or just unstructured text data of some kind, or even images. Images are another form of unstructured data. In the middle, we have semi-structured data, so things like JSON or XML. These are data sets that carry along some kind of schema. And they're, so-called, self-describing. So they describe their own structure. However, their schemas are maybe not as rigid as those schemas in a database table. So, as you can see, there's a little bit of a spectrum here, from unstructured to very structured. And so far, with RDDs, we've kind of focus on this side of of the spectrum. We've been typically reading in data from logs, or data from JSON. Somehow we've been manually parsing the JSON ourselves into case-class objects. And then we've been doing some kind of large-scale computation on all of that once we've parted into spark somehow. But how does all that relate back to Spark? Well, so far, Spark and the regular RDDs we've been looking at have no concept about the schema of the data that it's dealing with. So has no idea where the structure is of the data that it's operating on. All Spark knows, given some RDD, is that that RDD is parametrized with some kind of arbitrary type, like person, account, or demographic. But it doesn't actually know anything about these type structures. It just has a name. It has a name. It knows it should have a person inside of it. But it doesn't know anything about what that person is made up of. Perhaps a little more visually, assuming that we have a case-class account with these fields, name, which is a type string. Balance, a type double, and risk, a type Boolean. With an RDD of type Account, all Spark knows about Account is that it has these blobs of objects called Account. We know nothing about them other than their names. Spark can't look inside of these Account objects to look at their structure and analyze what parts of their structure might be used in the subsequent computation. For example, it's conceivable that this account object might be bigger than these three fields here, it could have hundreds of fields. And perhaps, a certain computation only needs one of those fields. But Spark, as is, will serialize each one of these really big account objects and send them all over the network, even though it's not necessary to use most of the data that it's sending around. Spark can't do optimizations like these, because it can't see inside of these account objects. And it can't optimize based on their structure. On the other hand, in a structured data set, in some kind of database table, for example, computations are done on columns of named and typed values. So absolutely everything about the structure of a data set is known in a structured setting like in a database table or in Hive. So absolutely everything about the structure of a data set is known in a database table. And in fact, databases tend to be heavily optimized. So given all of this structure, optimizations like I had mentioned with this account object can be done, because we know the structure of the data set that we're operating on, and we know which parts of it we are going to use, and we can optimize based on that information. So far we've been talking about structured versus unstructured data, but the same could also be said for computation. So there is also structured and unstructured computation. So far, in Spark, what we've been doing is, we've been doing functional transformations on data. So we pass some kind of user-defined function literal to a high order function like map, flatMap, or filter. So we're passing, essentially some kind of lambda function, some sort of function or passing it around. This, too is completely opaque. Just like in the previous example, Spark only knows that this is called some funny name like anon dollar sign blah di blah. So, in this case, this is also completely unstructured and opaque. Spark can't look at the operations that we're trying to do and then make some optimizations based on those optimizations. Again, we have some kind of opaque blob that Spark can't optimize on. Whereas in databases, typically we do declarative transformations on the structured data that we have in this data set. And all of these operations tend to be very specialized, very structured, very fixed, very rigid, predefined operations. So we know all of the possible operations that somebody could potentially do. And we can do lots of optimizations based on knowing all of the possibilities for these operations. So just to put these two things next to each other, in summary what we have when we look at Spark RDDs, is basically, a bunch of these unstructured objects that we don't know much about, and some kind of functionality that we also don't know anything about, so just some kind of lambda function that we can't look into and make optimizations based off of. And on the other hand, we have databases, where we have this very structured data set; everything is some kind of specified data type. It's organized in rows, in columns, in this very rigid structure. And there are these very fixed set of operations that we can do on this structured data. So, on the one hand we have not much structure here and it's difficult to aggressively optimize with so little structure. And on the other hand, we have lots of structure here, and optimization opportunities abound, which gives us all kinds of opportunities to do optimizations like reordering operations, for example, and putting filters before joins. So that brings me back to Optimizations in Spark. How does all of this talk about structured data and structured computations come back to Spark? Because, as we've seen, RDDs essentially operate on unstructured data, we have to parse it ourselves. And there are a few limits on the computations that we can do. Our computations already find us functions that we pass to high order functions that we've, you know, written ourselves. On our own data types, we defined our data types as well. As we painstakingly learned over the past two weeks, we have to think a lot about what's happening on the cluster and we have to do optimizations ourself. We have to think about how we can optimize our computations. Whereas in the databases world, optimizations are done automatically for you. Wouldn't it be nice if Spark could do some of these optimizations as well for us? Well, that's the whole point of Spark SQL. Spark makes these optimizations possible. The one caveat is we're going to have to give up some of the freedom, flexibility and generality that we've learned to love in this functional collections API in order to give Spark some structure and thus more opportunities to optimize.