Spark and Python for Big Data with PySpark

1 - Introduction to Course


































2 - Setting up Python with Spark

4 - Local VirtualBox Setup

我的是20版本
建议4-8G
建议20-50G

sudo apt-get install python3-pip
pip3 install jupyter -i 镜像源
# sudo apt-get install default-jre 
sudo apt-get install openjdk-8-jdk
java -version
sudo apt-get install scala
scala -version
# 2.11.6
pip3 install py4j

下载spark,我下的是3.3.2移到主目录

sudo tar -zxvf ...
export SPARK_HOME='/home/malred/...'
export PATH=$SPARK_HOME:$PATH




它的需要findspark,我spark3不需要,可以直接找到

7 - Python Crash Course

8 - Spark DataFrame Basics

24 - Introduction to Spark DataFrames


sparkframe以行列保存数据
可以从各种数据源取数据



25 - Spark DataFrame Basics

people.json

[
  {
    "name": "Michael",
    "age": 12
  },
  {
    "name": "Andy",
    "age": 13
  },
  {
    "name": "Justin",
    "age": 8
  }
] 



import findspark
findspark.init('/home/malred/spark-3.3.2-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Basics').getOrCreate()
df = spark.read.option("multiline","true").json("people.json")
df.show()
df.printSchema()
print(df.columns)
df.describe().show()

26 - Spark DataFrame Basics Part Two






import findspark
findspark.init('/home/malred/spark-3.3.2-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Basics').getOrCreate()
df = spark.read.option("multiline","true").json("people.json")
df.show()
df.printSchema()
df.columns
df.describe().show()
# 读取指定列
df.select('age').show()
# 打印 头2个中,下标为0的那个
df.head(2)[0]
type(df.head(2)[0])
df.select('age','name').show()
# 添加一列,该列值为已有的age列*2得来
df.withColumn('newage',df['age']*2).show()
# 重命名
df.withColumnRenamed('age','my_new_age').show()
# 注册临时sql视图
df.createOrReplaceTempView('people')
res=spark.sql('SELECT * FROM people')
res.show()
age_eq_13=spark.sql('select * from people where age = 13')
age_eq_13.show()

27 - Spark DataFrame Basic Operations

# import findspark
# findspark.init('/home/malred/spark-3.3.2-bin-hadoop3')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
# header表示第一行是列名
df = spark.read.csv('train.csv',inferSchema=True,header=True)
df.printSchema()
df.show()
df.head(3)[0]
# 过滤器
df.filter('Sex == "male"').show()
# select 选择要显示的列
df.filter('Sex == "female"').select('Sex').show()
# select 选择要显示的列
df.filter('Sex == "female"').select(['Sex','Name']).show()
# 过滤器写法2
df.filter(df['Sex']=='male').select('Ticket').show()
# 过滤器 多条件
df.filter((df['Sex']=='male') & (df['Pclass']>1)).show()
# 过滤器 非 ~
df.filter(~(df['Pclass']>1)).show() 
# collect -> list
res = df.filter(df['Pclass']==2).collect()
print(res)
row = res[0]
# 转为字典
print(row.asDict())
print(row.asDict()['Pclass'])

28 - Groupby and Aggregate Operations

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName('aggs').getOrCreate() 
df = spark.read.csv('EcommerceCustomers.csv',inferSchema=True,header=True) 
df.show() 
df.printSchema() 
# 根据字段分组
df.groupBy('Avatar') 
# 分组求平均
df.groupBy('Avatar').mean().show() 
df.groupBy('Avatar').count().show() 
# 聚合 求'Time on Website'列的值的sum
df.agg({'Time on Website':'sum'}).show() 
df.agg({'Time on Website':'max'}).show() 
group_data = df.groupBy('Avatar') 
# 分组进行agg聚合(字典方式)操作
group_data.agg({'Time on Website':'max'}).show() 
from pyspark.sql.functions import countDistinct,avg,stddev 
# 求数据总数(不含重复数据)
df.select(countDistinct('Time on Website')).show() 
# alias 其别名
# avg 求平均
df.select(avg('Time on Website').alias('website')).show() 
# stddev 标准差
df.select(stddev('Time on Website')).show() 
from pyspark.sql.functions import format_number 
web_std = df.select(stddev('Time on Website').alias('web')) 
web_std.show() 
# 保留2位
web_std.select(format_number('web',2).alias('web')).show() 
df.show() 
# 升序排列
df.orderBy('Time on App').select('Time on App').show() 
# 降序
df.orderBy(df['Time on App'].desc()).select(['Time on App','Email']).show() 

29 - Missing Data

创建csv








#!/usr/bin/env python
# coding: utf-8

# In[47]:


from pyspark.sql import SparkSession


# In[48]:


spark = SparkSession.builder.appName('miss').getOrCreate()


# In[49]:


df = spark.read.csv('miss.csv',header=True,inferSchema=True)


# In[50]:


df.show()


# In[51]:


# 删除带缺失值的数据
df.na.drop().show()


# In[52]:


# thresh: null值数量超过thresh个的数据才被删除
df.na.drop(thresh=2).show()


# In[53]:


# how: 删除策略(all/any)
df.na.drop(how='all').show()


# In[54]:


# subset: 指定子集(列)含null值的数据才删除 
df.na.drop(subset=['Name']).show()


# In[55]:


# pyspark自可以自动推断类型,在填充缺失值时很有帮助
df.printSchema()


# In[56]:


# 输入字符串,就自动填充字符串类型的数据
df.na.fill('FILL VALUE').show()


# In[58]:


# 输入number,就自动填充number类型的数据
df.na.fill(0).show()


# In[59]:


# 指定列填充指定数据
df.na.fill('No Name',subset=['Name']).show()


# In[60]:


from pyspark.sql.functions import mean


# In[64]:


mean_val = df.select(mean(df['Sales'])).collect()
mean_sale = mean_val[0][0]
mean_sale


# In[65]:


# 给缺失值填充该列的平均值
df.na.fill(mean_sale,['Sales']).show()


# In[66]:


df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

 

30 - Dates and Timestamps





#!/usr/bin/env python
# coding: utf-8

# In[9]:


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('miss').getOrCreate()
df = spark.read.csv('appl_stock.csv',header=True,inferSchema=True)


# In[10]:


df.head(1)


# In[12]:


df.select(['Date','Open']).show()


# In[16]:


from pyspark.sql.functions import (dayofmonth,hour,dayofyear,
                                   month,year,weekofyear,
                                   format_number,date_format)


# In[18]:


# 获取每个Date数据(year-month-day)的day
df.select(dayofmonth(df['Date'])).show()


# In[19]:


# 获取每个Date数据(year-month-day)的month
df.select(month(df['Date'])).show()


# In[20]:


# 获取每个Date数据(year-month-day)的year
df.select(year(df['Date'])).show()


# In[24]:


# 得到Date数据里的year并作为新的一列合并到原表格
newdf = df.withColumn('Year',year(df['Date']))


# In[29]:


# 每年的平均Close
result = newdf.groupBy('Year').mean().select(['Year','avg(Close)'])


# In[33]:


# 重命名列
new = result.withColumnRenamed('avg(Close)','Average CLosing Price')


# In[39]:


# 格式化新列,保留两位小数
new.select(['Year',format_number('Average CLosing Price',2).alias('avg close')]).show()
 

9 - Spark DataFrame Project Exercise

31 - DataFrame Project Exercise


 上一篇
Complete-Vue-Developer-2023-Zero-to-Mastery(Pinia, Vitest) Complete-Vue-Developer-2023-Zero-to-Mastery(Pinia, Vitest)
01 - Introduction analyse 出现时间 angular->react->vueangular 内置了很多东西,很全面react 只提供 ui 相关,功能少,其他功能都是自由组合(第三方)vue 希望结合其他
下一篇 
The-Complete-Cyber-Security-Course-Hackers-Exposed! The-Complete-Cyber-Security-Course-Hackers-Exposed!
1. introduction1.1. Welcome and Introduction to the Instructor! 1.2. Security Quick Win! 打开生成的这个文档会发送邮件给我们给的邮箱,但是视频里的 w
  目录