Editorial (from Vitosh): Lately, I have reviewed the Mastering Large Datasets book by J.T. Wolohan. The book is python based and it explains how to scale a data project, using the map and reduce to scale data projects. Long story short, I liked one of the examples from the book, thus I have decided to publish it here. The people from the editorial house were kind enough to provide a special discount code for the book, good on all formats and has no expiration date – mldwohan40.
Scenario – Changing Car Trends
Your customer is a used car dealer. They have data on cars that they’ve bought and sold in the last 6 months and are hoping you can help them find what type of used cars they make the most profit on. One salesman believes that its high fuel-efficiency cars (those that get more than 35 miles per gallon) that make the most money, while another believe that medium-mileage cars (between 60,000 and 100,000 miles) result in the highest average profit on resale. Given a CSV file with a variety of attributes about some used cars, write a script to find the average profit on cars of low (<18 mpg), medium (18-35 mpg) and high (>35) fuel-efficiency as well as low (<60,000), medium (60,000-100,000), and high mileage (>100,000) and settle the debate.
Before we dig into the details of the problem, let’s take a look at its fundamentals: the data transformations. We’ll start with a series of dicts, each of which represents a vehicle. By default, these dicts are going to have a lot of information we’re not interested in and won’t have some of the information we do want—it’ll be a good idea to transform into a better format for analysis. We’ll tackle that with a map because we want to clean up each dict. From there, we want to roll up that data up into a dict which can help us understand the profit produced by each type of car. This will require a reduction.
Overall, the whole problem will look something like figure 5.10. On the left, we start with the data our customer hands us. We’ll concoct a function to clean up each record and map that across our data. Then, we’ll pass that into reduce, which itself has an accumulator function we’ve designed to collect the necessary information. For this, we’ll want to gather both sum
and count by group—the two figures necessary to calculate an average.
Using map to clean our car data
In order to design our cleaning helper function, let’s first take a closer look at the individual elements we’re going to be working with. Each car in our data set is going to look something like figure below:
For each entry, we’ll have a dict with lots of attributes we’re not particularly interested in, along with the four that we are interested in: price-buy, price-sell, mpg, and miles. These four keys in our dict represent the price the car was bought at, the price the car was sold at, the manufacturer-listed miles-per-gallon of the vehicle, and the number of miles on the car. However, we’re not actually interested in the values of any of these variables directly. Rather, we’re interested in values that can be calculated from them.
- Instead of price bought and sold, we’re interested in total profit
- Instead of absolute miles-per-gallon, we’re interested in low, medium, and high mpg
- Instead of absolute number of miles, we’re interested in low, medium, and high mileage
That said, to clean each data entry we’ll want to do the following things:
- Calculate profit on the vehicle from price bought and sold.
- Sort the vehicle into low, medium and high miles per gallon.
- Sort the vehicle into low, medium, and high mileage.
To do this, we’ll create three separate functions that each handle a piece of the problem and wrap them in a single function we can map across all our data. Let’s design each of these three helper functions now, starting with calculating profit.
The profit calculation function is only a small change from a basic operation: arithmetic. In other conditions, this might be a good case for a lambda function; however, because we’re planning on using this function inside another function, we’ll want to give it a name. Our get_profit function will find the difference between the price the car was sold at and the price the car was bought at. We can see that below:
1 2 |
def get_profit(d): return d.get("price-sell",0) - d.get("price-buy",0) #A |
One thing to note about listing 5.9 is that we use the get method of the dict instead of the [] syntax because with get we can provide a default value. We do this to prempt the errors that would be thrown by a missing value (though, there are no missing values in the data you’ve been provided).
Next up, we have two helper functions that provide similar functionality. One which buckets miles per gallon into three categories—low, medium and high—and one which buckets mileage into three categories—low, medium and high. Because these functions are so similar, let’s work on them at the same time.
Both of these functions share a common behavior: comparing a value to a series of break points and then assigning them to either low, medium, or high. We can write a general function that takes a dictionary, a key, and two break points, and returns low when the value of the dictionary at the key specified is below the first break point, medium when it’s below the
second, and high when its above both. That function will look like the code:
1 2 3 4 5 6 |
def low_med_hi(d, k, low, high): if d[k] < low: #A return "low" elif d[k] < high: #B return "med" return "high" #C |
- A – If the value of the dict at the key of interest is below our first break, we return low
- B – If that value is below the second break, we return medium
- C – If it’s not lower than either break, we’ll return high
With this function written, we can start to assemble all of the pieces together. We’ll want to do the following.
- Take in a dict.
- Clean the dict with our select_keys function.
- Return a dict that has three keys:
- A profit key indicating the profit made on the vehicle.
- A mpg key indicating the vehicles mpg category.
- An odo key indicating the vehicles mileage.
1 2 3 4 5 6 |
def clean_entry(d): r = {} #A r['profit'] = get_profit(d) #B r['mpg'] = low_med_hi(d,'mpg',(18,35)) #C r['odo'] = low_med_hi(d,'odo',(60000,105000)) #D return r |
- A – We first initialize a new dict for our output data
- B – We can use our profit function to get the profit
- C – We’ll use the low medium high function twice to get our mpg and odo categories
- D – Each use takes different paremeters, corresponding to the specifics of those variables.
Using reduce for sums and counts
With our map wrapper function written, it is time to move on to our reduction. Knowing what our map will begin returning, we can use reduce to convert those items into our desired output data. What we want is going to be a dict with six keys, one each for high, medium and low miles per gallon and one each for high, medium and low mileage. The values of each of these keys should contain the average profit on vehicles of that type. Because we’ll need the total profit and the total number of cars sold to calculate average profit, we’ll keep track of those values as well. For readability, it makes sense to throw those values into a dictionary as well. This will leave us with a dict with six keys—one for each of the categories—each of which points to another dictionary with three keys—one for average profit, and two for the values necessary to calculate the average profit.
To do this, our accumulator function will roll the profit of each observation of our dataset into keys of our accumulated value: one based on its mileage category and one based on its miles per gallon category. Because calculating the total profit, count and average is a little involved—more than we can accomplish with a single expression—let’s wrap this behavior in a helper function. That helper function will take the accumulated total, count and average of the category of car and mix in the profit for the new car, while also incrementing the count and calculating a new average. These two functions together can be seen below:
1 2 3 4 5 6 7 8 9 10 11 12 |
def acc_average(acc, profit): #A acc['total'] = acc.get('total',0) + profit #B acc['count'] = acc.get('count',0) + 1 acc['average'] = acc['total']/acc['count'] #C return acc def sort_and_add(acc, nxt):#D profit = nxt['profit'] #E acc['mpg'][nxt['mpg']] = acc_average(acc['mpg'].get(nxt['mpg'],{}), profit) #F acc['odo'][nxt['odo']] = acc_average(acc['odo'].get(nxt['odo'],{}), profit) return acc |
- A -We define a helper function calculate averages
- B – We’ll use the get method here in case we find an empty dictionary
- C – Our average value will be the profit divided by the count.
- D – Again our accumulator function is going to take an acc and a nxt
- E – Because we’re going to use profit twice, we’ll store it in a variable for easy access.
- F – We’ll modify the accumulated value for each of the two categories that the car belongs in.
Again, as several times previously in this chapter, we’re using the dictionary get method to access the key of a dictionary and provide a default value. In each of these cases, we want to have a default value that provides the expected type of data to the function using the resulting data. In our acc_average function, we use get because our addition operation needs a number. In this case, we specify the integer zero if we don’t have the key in question. In our sort_and_add accumulator function, we specify an empty dictionary because our acc_average function expects a dictionary in its first position. Because we use the get method in both places, we can go from having no data to having a fully populated data structure without making any assumptions about what categories are in the underlying data. This is the same trick we used in our frequencies reduction example, just on a bigger scale.
Applying the map and reduce pattern to cars data
With all of our helper functions written, including the data transformation for map and the accumulator for reduce, we’re ready to process our data. One of the great things about using a map and reduce style is that this takes only a single line of code:
1 |
reduce(sort_and_add, map(clean_entry, cars_data), {}) |
We use map to apply the clean_entry function to each entry in our cars data, resulting in a cleaned sequence of data that is ready for us to reduce through. Then we call reduce with its three parameters: the accumulator function, the data, and an optional initializer. For the accumulator function, we use the accumulator we designed: sort_and_add. For the data, we use the results from our map operation. For the initializer, we use an empty dict. All together, our code will look like the one below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
from functools import reduce import json def low_med_hi(d,k,breaks): if float(d[k]) < breaks[0]: return "low" elif float(d[k]) < breaks[1]: return "medium" else: return "high" def clean_entry(d): r = {'profit':None, 'mpg':None, 'odo':None} r['profit'] = float(d.get("price-sell",0)) - float(d.get("price-buy",0)) r['mpg'] = low_med_hi(d,'mpg',(18,35)) r['odo'] = low_med_hi(d,'odo',(60000,105000)) return r def acc_average(acc, profit): acc['total'] = acc.get('total',0) + profit acc['count'] = acc.get('count',0) + 1 acc['average'] = acc['total']/acc['count'] return acc def sort_and_add(acc,nxt): p = nxt['profit'] acc['mpg'][nxt['mpg']] = acc_average(acc['mpg'].get(nxt['mpg'],{}), p) acc['odo'][nxt['odo']] = acc_average(acc['odo'].get(nxt['odo'],{}), p) return acc if __name__ == "__main__": with open("cars.json") as f: xs = json.load(f) results = reduce(sort_and_add, map(clean_entry, xs), {"mpg":{},"odo":{}}) print(json.dumps(results, indent=4)) |
After running the code, you may settle the debate between the two salesman, which car category makes the most profit. The result is here:
The cars.json is available here at GitHub. The whole code is available there as well.