Taming Big Data with Apache Spark and Python - Hands On!

2. Spark Basics and the RDD Interface

1. What’s new in Spark 3

2. Introduction to Spark


3. The Resilient Distributed Dataset (RDD)


4. Ratings Histogram Walkthrough


# 交给spark运行
spark-submit ratings-counter.py
from pyspark import SparkConf, SparkContext, SQLContext
import collections

# org.apache.spark.SparkException: Python worker failed to connect back.
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf=conf)  # spark 上下文

# lines = sc.textFile('file:///SparkCourse/ml-100k/u.data')
lines = sc.textFile('../dataset/fakefriends.csv')

# 截取每行的第4列
ratings = lines.map(lambda x: x.split(',')[2])
# 计算所有行中, 不同数据出现的次数
result = ratings.countByValue()

# 将结果排序
sortedResults = collections.OrderedDict(sorted(result.items()))
for k, v in sortedResults.items():
    print("%s %i" % (k, v))

5. KeyValue RDD’s, and the Average Friends by Age Example



6. [Activity] Running the Average Friends by Age Example

from pyspark import SparkConf, SparkContext

import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('FriendsByAge')
sc = SparkContext(conf=conf)  # spark 上下文


def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)


lines = sc.textFile('../dataset/fakefriends.csv')
# csv的每一行执行parseLine
rdd = lines.map(parseLine)
# (k,x) -> (k,(x,1)) -> (k, (x+y, x1+y1)) ; x+y=total age, x1+y1=how many
# reduceByKey: 对key相同的数据进行操作
totalsByAge = rdd.mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for res in results:
    print(res)

7. Filtering RDD’s, and the Minimum Temperature by Location Example


8. [Activity]Running the Minimum Temperature Example, and Modifying it for Maximums

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('min-temp')
sc = SparkContext(conf=conf)  # spark 上下文


def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    # 转华氏温度
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)


lines = sc.textFile('../dataset/1800.csv')
# csv的每一行执行parseLine
rdd = lines.map(parseLine)
# x[1] 是一个字符串标记, 这里找出最低温度标记
minTemps = rdd.filter(lambda x: "TMIN" in x[1])
# 去掉其他列
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
# 相同key(station)之间取y(temp)值小的
minTemps = stationTemps.reduceByKey(lambda x, y: min(x, y))
results = minTemps.collect()

for res in results:
    print(res[0] + "\t{:.2f}F".format(res[1]))

9. [Activity] Running the Maximum Temperature by Location Example

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('max-temp')
sc = SparkContext(conf=conf)  # spark 上下文


def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    # 转华氏温度
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)


lines = sc.textFile('../dataset/1800.csv')
# csv的每一行执行parseLine
rdd = lines.map(parseLine)
# x[1] 是一个字符串标记, 这里找出最低温度标记
minTemps = rdd.filter(lambda x: "TMAX" in x[1])
# 去掉其他列
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
# 相同key(station)之间取y(temp)值小的
minTemps = stationTemps.reduceByKey(lambda x, y: max(x, y))
results = minTemps.collect()

for res in results:
    print(res[0] + "\t{:.2f}F".format(res[1]))

10. [Activity] Counting Word Occurrences using flatmap()

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('WordCount')
sc = SparkContext(conf=conf)  # spark 上下文

input = sc.textFile('../dataset/Book.txt')
# 扁平化, 案例:对给定单词列表 ["Hello","World"],你想返回列表["H","e","l","o","W","r","d"]
# 用空格分割句子, 然后展开为一个一个单词
words = input.flatMap(lambda x: x.split())
# (x) -> (x,count)
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord, count)

11. [Activity] Improving the Word Count Script with Regular Expressions

from pyspark import SparkConf, SparkContext
import re
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置


# 只提取单词, 不保留标点符号等
def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())


conf = SparkConf().setMaster('local').setAppName('WordCountBetter')
sc = SparkContext(conf=conf)  # spark 上下文

input = sc.textFile('../dataset/Book.txt')
# 扁平化, 案例:对给定单词列表 ["Hello","World"],你想返回列表["H","e","l","o","W","r","d"]
# 用空格分割句子, 然后展开为一个一个单词
words = input.flatMap(normalizeWords)
# (x) -> (x,count)
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord, count)

12. [Activity] Sorting the Word Count Results

from pyspark import SparkConf, SparkContext
import re
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置


# 只提取单词, 不保留标点符号等
def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())


conf = SparkConf().setMaster('local').setAppName('WordCountBetter')
sc = SparkContext(conf=conf)  # spark 上下文

input = sc.textFile('../dataset/Book.txt')
# 扁平化, 案例:对给定单词列表 ["Hello","World"],你想返回列表["H","e","l","o","W","r","d"]
# 用空格分割句子, 然后展开为一个一个单词
words = input.flatMap(normalizeWords)

# (x) -> (x,1) => (x,1),(x,1) -> (x,2) ...
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
# (word, count) -> (count, word) -> sort by count
# wordCountsSorted = wordCounts.map(lambda (x,y): (y,x)).sortByKey() # Python 3 中不支持元组形参解包
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey()

results = wordCountsSorted.collect()

for res in results:
    count = str(res[0])
    word = res[1].encode('ascii', 'ignore')
    if (word):
        print(word, ':\t\t', count)

13. [Exercise] Find the Total Amount Spent by Customer

客户ID,项目ID,花费金额

14. [Excercise] Check your Results, and Now Sort them by Total Amount Spent.

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('Customer')
sc = SparkContext(conf=conf)  # spark 上下文


def extractCustomerPricePairs(line):
    fields = line.split(',')
    return (int(fields[0]), float(fields[2]))


input = sc.textFile('../dataset/customer-orders.csv')
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)

results = totalByCustomer.collect()
for res in results:
    print(res)

15. Check Your Sorted Implementation and Results Against Mine.

from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

conf = SparkConf().setMaster('local').setAppName('Customer')
sc = SparkContext(conf=conf)  # spark 上下文


def extractCustomerPricePairs(line):
    fields = line.split(',')
    return (int(fields[0]), float(fields[2]))


input = sc.textFile('../dataset/customer-orders.csv')
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)

flipped = totalByCustomer.map(lambda x: (x[1], x[0]))
totalByCustomerSorted = flipped.sortByKey()

results = totalByCustomerSorted.collect()
for res in results:
    print(res)

3. SparkSQL, DataFrames, and DataSets

1. Introducing SparkSQL

现在更多地使用spark dataframe

2. [Activity] Executing SQL commands and SQL-style functions on a DataFrame

from pyspark.sql import SparkSession, Row

# org.apache.spark.SparkException: Python worker failed to connect back.
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

# create session
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()


def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode('utf-8')),
               age=int(fields[2]), numFriends=int(fields[3]))


lines = spark.sparkContext.textFile('../dataset/fakefriends.csv')
people = lines.map(mapper)

# Infer the schema,and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView('people')  # 创建临时视图

# SOL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql('SELECT * FROM people WHERE age >= 13 AND age <= 19')

# The results of SQL queries are RDDs and support all the normal RDD operations.
for teen in teenagers.collect():
    print(teen)

# We can also use functions instead of SQL queries:
schemaPeople.groupBy('age').count().orderBy('age').show()

spark.stop()

3. Using DataFrames instead of RDD’s

from pyspark.sql import SparkSession, Row

# org.apache.spark.SparkException: Python worker failed to connect back.
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

# create session
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()

people = spark.read.option('header', 'true') \
    .option('inferSchema', 'true') \
    .csv('../dataset/fakefriends-header.csv')

print("Here is our inferred schema:")
people.printSchema()

print("Let's display the name column:")
people.select("name").show()

print("Filter out anyone over 21:")
people.filter(people.age < 21).show()

print("Group by age")
people.groupBy("age").count().show()

print("Make everyone 10 years older:")
people.select(people.name, people.age + 10).show()

spark.stop()

4. [Exercise] Friends by Age, with DataFrames

5. Exercise Solution Friends by Age, with DataFrames

from pyspark.sql import SparkSession, Row, functions as func

# org.apache.spark.SparkException: Python worker failed to connect back.
import os

os.environ['PYSPARK_PYTHON'] = "python"  # 放Python的位置

spark = SparkSession.builder.appName('FriendsByAge').getOrCreate()

lines = spark.read.option('header', 'true') \
    .option('inferSchema', 'true') \
    .csv('../dataset/fakefriends-header.csv')

# Select only age and numFriends columns
friendsByAge = lines.select('age', 'friends')

# From friendsByAge we group by "age" and then compute average
friendsByAge.groupby('age').avg('friends').show()

# Sorted
friendsByAge.groupby('age').avg('friends').sort('age').show()

# Formatted more nicely
# agg() 聚合多种操作
friendsByAge.groupby('age').agg(func.round(func.avg('friends'), 2)).sort('age').show()

# With a custom column name
(friendsByAge.groupby('age').agg(func.round(func.avg('friends'), 2).alias('friends_avg'))
 .sort('age').show())

spark.stop()

6. [Activity] Word Count, with DataFrames


  目录