1 - Introduction to Course
2 - Setting up Python with Spark
4 - Local VirtualBox Setup
sudo apt-get install python3-pip
pip3 install jupyter -i 镜像源
# sudo apt-get install default-jre
sudo apt-get install openjdk-8-jdk
java -version
sudo apt-get install scala
scala -version
# 2.11.6
pip3 install py4j
sudo tar -zxvf ...
export SPARK_HOME='/home/malred/...'
export PATH=$SPARK_HOME:$PATH
7 - Python Crash Course
8 - Spark DataFrame Basics
24 - Introduction to Spark DataFrames
25 - Spark DataFrame Basics
people.json
[
{
"name": "Michael",
"age": 12
},
{
"name": "Andy",
"age": 13
},
{
"name": "Justin",
"age": 8
}
]
import findspark
findspark.init('/home/malred/spark-3.3.2-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Basics').getOrCreate()
df = spark.read.option("multiline","true").json("people.json")
df.show()
df.printSchema()
print(df.columns)
df.describe().show()
26 - Spark DataFrame Basics Part Two
import findspark
findspark.init('/home/malred/spark-3.3.2-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Basics').getOrCreate()
df = spark.read.option("multiline","true").json("people.json")
df.show()
df.printSchema()
df.columns
df.describe().show()
# 读取指定列
df.select('age').show()
# 打印 头2个中,下标为0的那个
df.head(2)[0]
type(df.head(2)[0])
df.select('age','name').show()
# 添加一列,该列值为已有的age列*2得来
df.withColumn('newage',df['age']*2).show()
# 重命名
df.withColumnRenamed('age','my_new_age').show()
# 注册临时sql视图
df.createOrReplaceTempView('people')
res=spark.sql('SELECT * FROM people')
res.show()
age_eq_13=spark.sql('select * from people where age = 13')
age_eq_13.show()
27 - Spark DataFrame Basic Operations
# import findspark
# findspark.init('/home/malred/spark-3.3.2-bin-hadoop3')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
# header表示第一行是列名
df = spark.read.csv('train.csv',inferSchema=True,header=True)
df.printSchema()
df.show()
df.head(3)[0]
# 过滤器
df.filter('Sex == "male"').show()
# select 选择要显示的列
df.filter('Sex == "female"').select('Sex').show()
# select 选择要显示的列
df.filter('Sex == "female"').select(['Sex','Name']).show()
# 过滤器写法2
df.filter(df['Sex']=='male').select('Ticket').show()
# 过滤器 多条件
df.filter((df['Sex']=='male') & (df['Pclass']>1)).show()
# 过滤器 非 ~
df.filter(~(df['Pclass']>1)).show()
# collect -> list
res = df.filter(df['Pclass']==2).collect()
print(res)
row = res[0]
# 转为字典
print(row.asDict())
print(row.asDict()['Pclass'])
28 - Groupby and Aggregate Operations
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('EcommerceCustomers.csv',inferSchema=True,header=True)
df.show()
df.printSchema()
# 根据字段分组
df.groupBy('Avatar')
# 分组求平均
df.groupBy('Avatar').mean().show()
df.groupBy('Avatar').count().show()
# 聚合 求'Time on Website'列的值的sum
df.agg({'Time on Website':'sum'}).show()
df.agg({'Time on Website':'max'}).show()
group_data = df.groupBy('Avatar')
# 分组进行agg聚合(字典方式)操作
group_data.agg({'Time on Website':'max'}).show()
from pyspark.sql.functions import countDistinct,avg,stddev
# 求数据总数(不含重复数据)
df.select(countDistinct('Time on Website')).show()
# alias 其别名
# avg 求平均
df.select(avg('Time on Website').alias('website')).show()
# stddev 标准差
df.select(stddev('Time on Website')).show()
from pyspark.sql.functions import format_number
web_std = df.select(stddev('Time on Website').alias('web'))
web_std.show()
# 保留2位
web_std.select(format_number('web',2).alias('web')).show()
df.show()
# 升序排列
df.orderBy('Time on App').select('Time on App').show()
# 降序
df.orderBy(df['Time on App'].desc()).select(['Time on App','Email']).show()
29 - Missing Data
#!/usr/bin/env python
# coding: utf-8
# In[47]:
from pyspark.sql import SparkSession
# In[48]:
spark = SparkSession.builder.appName('miss').getOrCreate()
# In[49]:
df = spark.read.csv('miss.csv',header=True,inferSchema=True)
# In[50]:
df.show()
# In[51]:
# 删除带缺失值的数据
df.na.drop().show()
# In[52]:
# thresh: null值数量超过thresh个的数据才被删除
df.na.drop(thresh=2).show()
# In[53]:
# how: 删除策略(all/any)
df.na.drop(how='all').show()
# In[54]:
# subset: 指定子集(列)含null值的数据才删除
df.na.drop(subset=['Name']).show()
# In[55]:
# pyspark自可以自动推断类型,在填充缺失值时很有帮助
df.printSchema()
# In[56]:
# 输入字符串,就自动填充字符串类型的数据
df.na.fill('FILL VALUE').show()
# In[58]:
# 输入number,就自动填充number类型的数据
df.na.fill(0).show()
# In[59]:
# 指定列填充指定数据
df.na.fill('No Name',subset=['Name']).show()
# In[60]:
from pyspark.sql.functions import mean
# In[64]:
mean_val = df.select(mean(df['Sales'])).collect()
mean_sale = mean_val[0][0]
mean_sale
# In[65]:
# 给缺失值填充该列的平均值
df.na.fill(mean_sale,['Sales']).show()
# In[66]:
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()
30 - Dates and Timestamps
#!/usr/bin/env python
# coding: utf-8
# In[9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('miss').getOrCreate()
df = spark.read.csv('appl_stock.csv',header=True,inferSchema=True)
# In[10]:
df.head(1)
# In[12]:
df.select(['Date','Open']).show()
# In[16]:
from pyspark.sql.functions import (dayofmonth,hour,dayofyear,
month,year,weekofyear,
format_number,date_format)
# In[18]:
# 获取每个Date数据(year-month-day)的day
df.select(dayofmonth(df['Date'])).show()
# In[19]:
# 获取每个Date数据(year-month-day)的month
df.select(month(df['Date'])).show()
# In[20]:
# 获取每个Date数据(year-month-day)的year
df.select(year(df['Date'])).show()
# In[24]:
# 得到Date数据里的year并作为新的一列合并到原表格
newdf = df.withColumn('Year',year(df['Date']))
# In[29]:
# 每年的平均Close
result = newdf.groupBy('Year').mean().select(['Year','avg(Close)'])
# In[33]:
# 重命名列
new = result.withColumnRenamed('avg(Close)','Average CLosing Price')
# In[39]:
# 格式化新列,保留两位小数
new.select(['Year',format_number('Average CLosing Price',2).alias('avg close')]).show()