2. Spark Basics and the RDD Interface
1. What’s new in Spark 3
2. Introduction to Spark
data:image/s3,"s3://crabby-images/3c25b/3c25b214800cc6875cfcf220376e0b4695b9fc69" alt=""
data:image/s3,"s3://crabby-images/b6e54/b6e54c60170eaecc8eee768dfbdb05bbb839873d" alt=""
data:image/s3,"s3://crabby-images/fce11/fce11ce33e4a8162c031ad3b88531399eca64926" alt=""
data:image/s3,"s3://crabby-images/30f8a/30f8a6b46b7245109e0badd9ff33417ba819e661" alt=""
data:image/s3,"s3://crabby-images/5b523/5b523afd3ca3ea0d7d09bc068927d47ed6508e93" alt=""
data:image/s3,"s3://crabby-images/673f4/673f49af1426e63ac9d5f56ac2146e6bda30ec65" alt=""
data:image/s3,"s3://crabby-images/65b38/65b383578ad17a74385a3cc8e26b6aa40f513097" alt=""
data:image/s3,"s3://crabby-images/631da/631dab835fbe2947dc0d0e728cfd721549b85693" alt=""
3. The Resilient Distributed Dataset (RDD)
data:image/s3,"s3://crabby-images/9e56f/9e56f4a905a390c2c9dd01b5afc344ff5b3b4aa4" alt=""
data:image/s3,"s3://crabby-images/dbc0b/dbc0bc0e6278562e2e345c222ea96d9097cb5321" alt=""
data:image/s3,"s3://crabby-images/9609b/9609b4f0234fc4cfc838764aedb50ed76b6db553" alt=""
data:image/s3,"s3://crabby-images/78be2/78be284068c96446340f557c886f37bc55b3823c" alt=""
data:image/s3,"s3://crabby-images/7e5cf/7e5cffaa93de7850bb423dd655ecce1407658dcd" alt=""
data:image/s3,"s3://crabby-images/21d88/21d886ab2daaa847bc1728c36a9d59ebacf134a9" alt=""
data:image/s3,"s3://crabby-images/7a372/7a37289191e95a46ca824b3afe8fc370749dc067" alt=""
data:image/s3,"s3://crabby-images/5265e/5265e5f0e178338255face2d643961a0ae64f0ce" alt=""
4. Ratings Histogram Walkthrough
data:image/s3,"s3://crabby-images/6e182/6e182989004bb69643bc12c37348d74da59901fa" alt=""
data:image/s3,"s3://crabby-images/8cf42/8cf422f13a10b6cf0fc4374b1398d6c2862f33fe" alt=""
data:image/s3,"s3://crabby-images/55a5d/55a5d42ffe5b2cc5decdbf45d36766a7c8146b45" alt=""
data:image/s3,"s3://crabby-images/d7d05/d7d05b98de4de35c973917923b64a92369d41bc6" alt=""
data:image/s3,"s3://crabby-images/d7aa2/d7aa2dbbe427352f93d2cdc874e7e476b00ff0b0" alt=""
data:image/s3,"s3://crabby-images/af18f/af18f2842e8e8652334b739b870b207ca46eb56b" alt=""
# 交给spark运行
spark-submit ratings-counter.py
from pyspark import SparkConf, SparkContext, SQLContext
import collections
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf=conf)
lines = sc.textFile('../dataset/fakefriends.csv')
ratings = lines.map(lambda x: x.split(',')[2])
result = ratings.countByValue()
sortedResults = collections.OrderedDict(sorted(result.items()))
for k, v in sortedResults.items():
print("%s %i" % (k, v))
5. KeyValue RDD’s, and the Average Friends by Age Example
data:image/s3,"s3://crabby-images/2de14/2de14a585f0047a836991a134c8477929508a1f1" alt=""
data:image/s3,"s3://crabby-images/ab818/ab8188e15966814e847fd6753659a0993460be23" alt=""
data:image/s3,"s3://crabby-images/63cf9/63cf9e64dc1b71d454d943b551a7380267cb2fe9" alt=""
data:image/s3,"s3://crabby-images/94613/94613573b9d0530fb2a163f7fecbc1717907e5a6" alt=""
data:image/s3,"s3://crabby-images/6596c/6596c396f9d1b99df5ebf8c5a978a394ba36eb30" alt=""
data:image/s3,"s3://crabby-images/f9bed/f9bed1bd0f866970f248f3c67f38ce480b2306ef" alt=""
data:image/s3,"s3://crabby-images/b5460/b54609d73b49a09864b6d68ee02f7e9eb4b1391d" alt=""
data:image/s3,"s3://crabby-images/6f079/6f0790a731b84ce53090d62d5997fd64880568a1" alt=""
data:image/s3,"s3://crabby-images/ea0e7/ea0e71d5dddc098be2d2205103a0f55c3f45c183" alt=""
6. [Activity] Running the Average Friends by Age Example
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('FriendsByAge')
sc = SparkContext(conf=conf)
def parseLine(line):
fields = line.split(',')
age = int(fields[2])
numFriends = int(fields[3])
return (age, numFriends)
lines = sc.textFile('../dataset/fakefriends.csv')
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for res in results:
print(res)
7. Filtering RDD’s, and the Minimum Temperature by Location Example
data:image/s3,"s3://crabby-images/8d59d/8d59d994675e88b4abbaf1b49db711fbff4df6bb" alt=""
data:image/s3,"s3://crabby-images/c8094/c80942c03475f8f074ea8d19a514b08dcbb776ec" alt=""
data:image/s3,"s3://crabby-images/626ef/626efaf8fe97f4deb6364439cd092a5d7c60d993" alt=""
data:image/s3,"s3://crabby-images/62c95/62c958763b38d4cdda7a72373d08301d3324a80d" alt=""
data:image/s3,"s3://crabby-images/0542c/0542cb3e7a2da3b7c0009d8b5455b9fdefa73305" alt=""
data:image/s3,"s3://crabby-images/0c881/0c881ab2f3693c93b0a7eccd9badc6f4f68852d6" alt=""
data:image/s3,"s3://crabby-images/5aedf/5aedf28c800952b2dfc66f9a9f9f3fa356f3162c" alt=""
8. [Activity]Running the Minimum Temperature Example, and Modifying it for Maximums
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('min-temp')
sc = SparkContext(conf=conf)
def parseLine(line):
fields = line.split(',')
stationID = fields[0]
entryType = fields[2]
temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
return (stationID, entryType, temperature)
lines = sc.textFile('../dataset/1800.csv')
rdd = lines.map(parseLine)
minTemps = rdd.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x, y))
results = minTemps.collect()
for res in results:
print(res[0] + "\t{:.2f}F".format(res[1]))
9. [Activity] Running the Maximum Temperature by Location Example
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('max-temp')
sc = SparkContext(conf=conf)
def parseLine(line):
fields = line.split(',')
stationID = fields[0]
entryType = fields[2]
temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
return (stationID, entryType, temperature)
lines = sc.textFile('../dataset/1800.csv')
rdd = lines.map(parseLine)
minTemps = rdd.filter(lambda x: "TMAX" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: max(x, y))
results = minTemps.collect()
for res in results:
print(res[0] + "\t{:.2f}F".format(res[1]))
10. [Activity] Counting Word Occurrences using flatmap()
data:image/s3,"s3://crabby-images/7de5d/7de5dc006b5a3f80a5b76e705ecdc097090cbdde" alt=""
data:image/s3,"s3://crabby-images/93193/93193d6cbd32006cee66c71953021fdfef765134" alt=""
data:image/s3,"s3://crabby-images/9bb44/9bb442a6decf9fac1b3a946b6addade73c9577e4" alt=""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('WordCount')
sc = SparkContext(conf=conf)
input = sc.textFile('../dataset/Book.txt')
words = input.flatMap(lambda x: x.split())
wordCounts = words.countByValue()
for word, count in wordCounts.items():
cleanWord = word.encode('ascii', 'ignore')
if (cleanWord):
print(cleanWord, count)
11. [Activity] Improving the Word Count Script with Regular Expressions
data:image/s3,"s3://crabby-images/79e9e/79e9e68ddefe07563687b4060d6e1071276a7bb0" alt=""
from pyspark import SparkConf, SparkContext
import re
import os
os.environ['PYSPARK_PYTHON'] = "python"
def normalizeWords(text):
return re.compile(r'\W+', re.UNICODE).split(text.lower())
conf = SparkConf().setMaster('local').setAppName('WordCountBetter')
sc = SparkContext(conf=conf)
input = sc.textFile('../dataset/Book.txt')
words = input.flatMap(normalizeWords)
wordCounts = words.countByValue()
for word, count in wordCounts.items():
cleanWord = word.encode('ascii', 'ignore')
if (cleanWord):
print(cleanWord, count)
12. [Activity] Sorting the Word Count Results
data:image/s3,"s3://crabby-images/05c51/05c51752b1dca586818fc820816793d698c6b6f2" alt=""
data:image/s3,"s3://crabby-images/e3e97/e3e97c4b2aa319b8693f494d4dca421b0fe52fb6" alt=""
from pyspark import SparkConf, SparkContext
import re
import os
os.environ['PYSPARK_PYTHON'] = "python"
def normalizeWords(text):
return re.compile(r'\W+', re.UNICODE).split(text.lower())
conf = SparkConf().setMaster('local').setAppName('WordCountBetter')
sc = SparkContext(conf=conf)
input = sc.textFile('../dataset/Book.txt')
words = input.flatMap(normalizeWords)
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey()
results = wordCountsSorted.collect()
for res in results:
count = str(res[0])
word = res[1].encode('ascii', 'ignore')
if (word):
print(word, ':\t\t', count)
13. [Exercise] Find the Total Amount Spent by Customer
data:image/s3,"s3://crabby-images/a1fad/a1fadca2dad7f6b434b354f951aaadea5725fa4a" alt="客户ID,项目ID,花费金额"
data:image/s3,"s3://crabby-images/f8867/f886708dd1bbc3faa7244af7a82b388e69b1ba50" alt=""
data:image/s3,"s3://crabby-images/0ecd4/0ecd4e935384049ce31fe529a34453990e66fb07" alt=""
14. [Excercise] Check your Results, and Now Sort them by Total Amount Spent.
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('Customer')
sc = SparkContext(conf=conf)
def extractCustomerPricePairs(line):
fields = line.split(',')
return (int(fields[0]), float(fields[2]))
input = sc.textFile('../dataset/customer-orders.csv')
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)
results = totalByCustomer.collect()
for res in results:
print(res)
15. Check Your Sorted Implementation and Results Against Mine.
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "python"
conf = SparkConf().setMaster('local').setAppName('Customer')
sc = SparkContext(conf=conf)
def extractCustomerPricePairs(line):
fields = line.split(',')
return (int(fields[0]), float(fields[2]))
input = sc.textFile('../dataset/customer-orders.csv')
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)
flipped = totalByCustomer.map(lambda x: (x[1], x[0]))
totalByCustomerSorted = flipped.sortByKey()
results = totalByCustomerSorted.collect()
for res in results:
print(res)
3. SparkSQL, DataFrames, and DataSets
1. Introducing SparkSQL
data:image/s3,"s3://crabby-images/3ba87/3ba8795e7ab7574a3f82aa28f13af9df45dd4a64" alt="现在更多地使用spark dataframe"
data:image/s3,"s3://crabby-images/29a17/29a1732ec8dd59c5d8f919a7896062bb91f59f78" alt=""
data:image/s3,"s3://crabby-images/67f62/67f623662ced750811bf90cbdb7fe34cd9c1ac2d" alt=""
data:image/s3,"s3://crabby-images/b447b/b447b4e316f9e97121eb315314c04f482ed9889f" alt=""
data:image/s3,"s3://crabby-images/28f3f/28f3f4dfb4830dbb2e5aeeaf3c4dc09c1f9d04c8" alt=""
data:image/s3,"s3://crabby-images/8073d/8073dbd3a34b86c82c3bc7725536f4dfcb0cc911" alt=""
data:image/s3,"s3://crabby-images/f462d/f462d5fc8982a65b2937a03946a42bd43fdaca4e" alt=""
2. [Activity] Executing SQL commands and SQL-style functions on a DataFrame
from pyspark.sql import SparkSession, Row
import os
os.environ['PYSPARK_PYTHON'] = "python"
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), name=str(fields[1].encode('utf-8')),
age=int(fields[2]), numFriends=int(fields[3]))
lines = spark.sparkContext.textFile('../dataset/fakefriends.csv')
people = lines.map(mapper)
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView('people')
teenagers = spark.sql('SELECT * FROM people WHERE age >= 13 AND age <= 19')
for teen in teenagers.collect():
print(teen)
schemaPeople.groupBy('age').count().orderBy('age').show()
spark.stop()
3. Using DataFrames instead of RDD’s
from pyspark.sql import SparkSession, Row
import os
os.environ['PYSPARK_PYTHON'] = "python"
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
people = spark.read.option('header', 'true') \
.option('inferSchema', 'true') \
.csv('../dataset/fakefriends-header.csv')
print("Here is our inferred schema:")
people.printSchema()
print("Let's display the name column:")
people.select("name").show()
print("Filter out anyone over 21:")
people.filter(people.age < 21).show()
print("Group by age")
people.groupBy("age").count().show()
print("Make everyone 10 years older:")
people.select(people.name, people.age + 10).show()
spark.stop()
4. [Exercise] Friends by Age, with DataFrames
data:image/s3,"s3://crabby-images/78966/78966863c7219ffe95c80ddcf659ff9948ac624d" alt=""
data:image/s3,"s3://crabby-images/b2c48/b2c48eddf221af167fdcb75cd53770bbc613610c" alt=""
5. Exercise Solution Friends by Age, with DataFrames
from pyspark.sql import SparkSession, Row, functions as func
import os
os.environ['PYSPARK_PYTHON'] = "python"
spark = SparkSession.builder.appName('FriendsByAge').getOrCreate()
lines = spark.read.option('header', 'true') \
.option('inferSchema', 'true') \
.csv('../dataset/fakefriends-header.csv')
friendsByAge = lines.select('age', 'friends')
friendsByAge.groupby('age').avg('friends').show()
friendsByAge.groupby('age').avg('friends').sort('age').show()
friendsByAge.groupby('age').agg(func.round(func.avg('friends'), 2)).sort('age').show()
(friendsByAge.groupby('age').agg(func.round(func.avg('friends'), 2).alias('friends_avg'))
.sort('age').show())
spark.stop()
6. [Activity] Word Count, with DataFrames