手写机器学习算法

梯度下降

# %%
import numpy as np
import matplotlib.pyplot as plt

"""
seed( ) 用于指定随机数生成时所用算法开始的整数值,如果使用相同的seed( )值,则每次生成的随即数都相同,
如果不设置这个值,则系统根据时间来自己选择这个值,此时每次生成的随机数因时间差异而不同
"""
np.random.seed(42)

N = 100  # 维度0, x有几行

# rand函数根据给定维度生成[0,1)之间的数据,包含0,不包含1
# 参数就是生成的维度(N,1) -> N*1
# 返回值为指定维度的array
x = np.random.rand(N, 1) * 5  # 随机x
y = 9.81 * x
# randn函数返回一个或一组样本,具有标准正态分布。
# 参数就是生成的维度(N,1) -> N*1
noise = 2 * np.random.randn(N, 1)
y_obs = y + noise  # 作为随机的y真实值

# %%
plt.scatter(x, y_obs, label="Observations")
plt.plot(x, y, c='r', label="True function")
plt.legend()
plt.show()


# %% 定义辅助函数

# helper functions
# w*x (y=w^t*x+b)
def f(w):
    return w * x


# 损失函数, 将误差平方后求合 / N -> 方差 
def loss_function(e):
    L = np.sum(np.square(e)) / N
    return L


# 计算梯度(导数): &L/&w = 1/N * ∑(2(y-wx)(-x)) = 1/N * ∑(2ξx)
def dL_dw(e, w):
    # mean:求平均
    return -2 * np.mean(e * x)

# w 会更新 -> w_new = w_current - γ * &L/&w ,其中γ是学习率(0~1)

# the actual gradient descent
def gradient_descent(iter=100, gamma=0.1):
    """
    梯度下降
    :param iter: 迭代轮数
    :param gamma: 学习率
    :return: w的历史值,loss的历史值
    """
    # get starting conditions 
    w = 10 * np.random.random() # 随机初始化权重参数

    # 保存参数w的所有历史值
    params = []
    # 生成 iter行1列的 0填充的 数组
    loss = np.zeros((iter, 1))  # 记录损失值
    for i in range(iter):
        params.append(w)
        # 计算误差
        e = y_obs - f(w)  # (y_obs真实值-f(w)预测值)
        loss[i] = loss_function(e)
        # update parameters 更新权重参数
        w_news = w - gamma * dL_dw(e, w)  # 更新w
        w = w_news
    return params, loss


# %% 开始计算
params, loss = gradient_descent()

iter = 100
gamma = 0.1
# 随机初始化权重参数
w = 10 * np.random.randn()
params = [] # w
loss = np.zeros((iter, 1)) # loss(e)
for i in range(iter):
    params.append(w)
    e = y_obs - f(w) # 求误差 (y_true-y_pred)
    loss[i] = loss_function(e)

    w_news = w - gamma * dL_dw(e, w)
    w = w_news

print(dL_dw(e, w))  # -1.0766942892814769e-14
plt.plot(loss)
plt.show()

params = np.array(params)
plt.plot(params)
plt.title('Gradient Descent')
plt.xlabel('w')
plt.show()

# 查看最后一个w,可以看到非常接近9.8
print(params[-1])  # 9.757142247178272

k-means

import matplotlib.pyplot as plt
import numpy as np

# 设置随机数种子
np.random.seed(42)

# %matplotlib inline

# 集群数
K = 3
# 两个维度
D = 2
# 点的总数
N = 1000
# 不同集群的点数
Ns = [300, 400, 300]

# 3行2列 的随机数组(服从正态分布)
# 相当于生成3个二维坐标点 -> [[x1,y1],[x2,y2],[x3,y3]]
means = 5 * np.random.randn(K, D)
print(means)

x = []
# x: ['a','b','c'], y: [1, 2, 3] -> zip(x,y) -> ('a',1), ('b', 2), ('c',3)
# zip(Ns, means) -> [(300,[x1,y1]),(400,[x2,y2]),(300,[x3,y3])]
for n, m in zip(Ns, means):
    print(n, m)
    # 生成 n行2列的 随机数组 -> 相当于n个二维坐标点 -> m是means里的二维坐标(集群中心点)
    x.append(np.random.randn(n, D) + m)

# 此时x的shape(形状)
# [(300, 2), (400, 2), (300, 2)]
print([x_.shape for x_ in x])

# zip -> [((300, 2),[x1,y1]),((400, 2),[x2,y2]),((300, 2),[x3,y3])]
for x_, m in zip(x, means):
    print(x_.shape, m.shape)
    # scatter(x,y) -> 绘制散点图
    # x: n行2列 -> x_[:,0]: 所有行的第一列(xi), x[:,1]: 所有行的第二列(yi)
    plt.scatter(x_[:, 0], x_[:, 1])
    # 绘制集群中心点
    # plt.plot里的这个'kx' -> 颜色字符k: 表示黑色,标记字符x: 表示图案为x标记
    plt.plot(m[0], m[1], 'kx')
plt.title('True Clusters')
plt.show()

# 从x取出所有数据
# np.vstack: 按垂直方向(行顺序)堆叠数组构成一个新的数组,堆叠的数组需要具有相同的维度
# np.hstack: 按水平方向(列顺序)堆叠数组构成一个新的数组,堆叠的数组需要具有相同的维度
data = np.vstack(x)
print(data.shape)
plt.scatter(data[:, 0], data[:, 1])
plt.show()


# 计算欧式距离 √((x1-x2)^2 + (y1-y2)^2)
def dist(x1, x2, axis=None):
    """
    Calculate euclidean distance
    :param x1: 点1 (x1,y1)
    :param x2: 点2 (x2,y2)
    :param axis:
    :return:
    """
    # np.square(x1 - x2) -> (x1-x2)^2 平方
    # np.sqrt 开根号
    return np.sqrt(np.sum(np.square(x1 - x2), axis))


# 计算平均值和任意给定点的距离
def distance_matrix(x, m):
    """
    Calculates the distance from each element x to each element in m.
    :param x: data points
    :param m: possible means
    :return: distance matrix
    """
    # 初始化(用0填充)距离矩阵(len(x)行len(m)列)
    d = np.zeros((len(x), len(m)))
    for i in range(len(x)):
        for j in range(len(m)):
            # 传入x的第i行所有列 x[i] -> (xi,yi)
            # 传入m的第j行所有列 m[j] -> (xm,ym)
            d[i, j] = dist(x[i, :], m[j, :])

    return d


# 测试
# Test out the distance matrix algorithm
x_test = np.array([[0, 1], [1, 0], [0, 0], [1, 1]])
print(distance_matrix(x_test, x_test))


# 使用广播替代循环(python的循环性能低)
# broadcasting
# 计算欧式距离 √((x1-x2)^2 + (y1-y2)^2)
def dist(x1, x2, axis=-1):
    """
    Calculate euclidean distance
    :param x1: 点1 (x1,y1)
    :param x2: 点2 (x2,y2)
    :param axis:
    :return:
    """
    return np.sqrt(np.sum(np.square(x1 - x2), axis))


def distance_matrix_broadcasting(x, m):
    # np.sum(axis=0) -> 行(维度1)求和 -> [[x1,y1],[x2,y2]] -> [[x1+x2],[y1+y2]]
    # np.sum(axis=1) -> 列(维度2)求和 -> [[x1,y1],[x2,y2]] -> [[x1+y1],[x2+y2]]
    # x所有点与m_相减([xx-mx,xy-my])并求平方,然后列相加((xx-mx)^2+(xy-my)^2,然后开根号得距离
    d = [dist(x, m_, axis=1) for m_ in m]
    # print(d)
    # 按行堆叠(axis=1)
    d = np.stack(d, axis=1)
    return d


# 测试
# Test out the distance matrix algorithm
x_test = np.array([[0, 1], [1, 0], [0, 0], [1, 1]])
print(distance_matrix_broadcasting(x_test, x_test))

# 计算k-means

# 初始化数据
# 集群数
k = 3
#
iters = 10
print(data.shape)
print(data)
# (1000, 2)
# 3个集群中心点,(3行2列)
means = np.random.randn(k, data.shape[1])

for i in range(iters):
    # 求data和中心点的距离 -> shape: (1000, 3) -> 这1000个点分别到3个中心点的距离
    d = distance_matrix_broadcasting(data, means)
    # print(d[:10])
    # print(d.shape) # (1000,3)
    # np.argmin: 找到最小的(值的下标)然后映射到指定轴
    '''
    array([[0, 4, 2],
           [3, 1, 5]]) # 3*2
    >>> np.argmin(a, axis=0)
    映射到行x
    找出每列最小元素的下标
    array([0 1 0]) # (3,)
    >>> np.argmin(a, axis=1)
    映射到列y
    找出每行最小元素的下标
    array([0 1]) #(2,)
    '''
    cluster = d.argmin(axis=-1)  # 找到平均距离最小值然后映射到列
    # true
    # print(cluster == np.argmin(d, axis=1))
    # print(cluster[:10])  # [一堆0、1、2]
    # print(cluster.shape) # (1000,)
    for j in range(k):
        # print(j) # 0,1,2
        # 筛选出和第j个集群中心点最近的下标
        idx = cluster == j  # numpy布尔索引
        # print(idx)
        # print(data[idx])
        # 绘制集群中心点
        plt.plot(means[j, 0], means[j, 1], 'rx')
        # data[idx] -> 找出所有cluster==j(与集群中心点j最近)的元素,组成新数组
        plt.scatter(data[idx, 0], data[idx, 1])
        # calculate new mean
        # print('means\n', means[j, :])
        # print('new:\n', data[idx].mean(axis=0))
        # 根据当前平均距离最小的点确定新点
        # 将距离集群中心点j最近的所有点的坐标求平均,得出新点的坐标
        means[j, :] = data[idx].mean(axis=0)
        # print(data[idx])
        # print(means[j, :])
    plt.show()

KNN

# 全部行都能输出
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# 解决坐标轴刻度负号乱码
plt.rcParams['axes.unicode_minus'] = False
# 解决中文乱码问题
plt.rcParams['font.sans-serif'] = ['Simhei']
plt.style.use('ggplot')
# plt.figure(figsize=(2,3),dpi=720)

rowdata = {'颜色深度': [14.13, 13.2, 13.16, 14.27, 13.24, 12.07, 12.43, 11.79, 12.37, 12.04],
           '酒精浓度': [5.64, 4.28, 5.68, 4.80, 4.22, 2.76, 3.94, 3.1, 2.12, 2.6],
           '品种': [0, 0, 0, 0, 0, 1, 1, 1, 1, 1]}
# 0 代表 “黑皮诺”,1 代表 “赤霞珠”
wine_data = pd.DataFrame(rowdata)
print(wine_data)

X = np.array(wine_data.iloc[:, 0:2])  # 我们把特征(酒的属性)放在X
y = np.array(wine_data.iloc[:, -1])  # 把标签(酒的类别)放在Y
# 探索数据,假如我们给出新数据[12.03,4.1] ,你能猜出这杯红酒是什么类别么?
new_data = np.array([12.03, 4.1])
plt.scatter(X[y == 1, 0], X[y == 1, 1], color='red', label='赤霞珠')  # 画出标签y为1的、关于“赤霞珠”的散点
plt.scatter(X[y == 0, 0], X[y == 0, 1], color='purple', label='黑皮诺')  # 画出标签y为0的、关于“黑皮诺”的散点
plt.scatter(new_data[0], new_data[1], color='yellow')  # 新数据点
print(new_data)
plt.xlabel('酒精浓度')
plt.ylabel('颜色深度')
plt.legend(loc='lower right')
plt.savefig('葡萄酒样本.png')
plt.show()

from math import sqrt

distance = [sqrt(np.sum((x - new_data) ** 2)) for x in X]
print(distance)

# 从近到远排序后的原数据点
sort_dist = np.argsort(distance)
print('sort_dist:\n', sort_dist)

k = 3
# 找出离新数据最近的3个点
topK = [y[i] for i in sort_dist[:k]]
print(topK)  # [1,1,0]

# Pandas Series 类似表格中的一个列(column),类似于一维数组,可以保存任何数据类型
# 找出其中数量最多的值
print(pd.Series(topK).value_counts().index[0])  # 1


def KNN(new_data, dataSet, k):
    '''
    函数功能:KNN分类器
    参数说明:
    new_data: 需要预测分类的数据集
    dataSet: 已知分类标签的数据集
    k: k-近邻算法参数,选择距离最小的k个点
    return:
    result: 分类结果
    '''
    from math import sqrt
    from collections import Counter
    import numpy as np
    import pandas as pd
    result = []
    distance = [sqrt(np.sum((x - new_data) ** 2)) for x in
                np.array(dataSet.iloc[:, 0:len(dataSet.columns) - 2])]
    sort_dist = np.argsort(distance)
    topK = [dataSet.iloc[:, -1][i] for i in sort_dist[:k]]
    result.append(pd.Series(topK).value_counts().index[0])
    return result


# 测试函数的运行结果
new_data = np.array([12.03, 4.1])
k = 3
print(KNN(new_data, wine_data, k))  # [1]

决策树

香农熵

  def calEnt(dataSet):
      n = dataSet.shape[0]  # 数据集总行数
      # [[不是, 3], [是, 2]]
      iset = dataSet.iloc[:, -1].value_counts()  # 标签的所有类别
      p = iset / n  # 每一类标签所占比
      ent = (-p * np.log2(p)).sum()  # 计算信息熵
      return ent

  # 计算全体数据的信息熵——根据标签列去进行计算
  # 0.9709505944546686
  calEnt(dataSet)

数据集最佳切分函数

import numpy as np
import pandas as pd
row_data = {'是否陪伴': [0, 0, 0, 1, 1],
            '是否玩游戏': [1, 1, 0, 1, 1],
            '渣男': ['是', '是', '不是', '不是', '不是']}
dataSet = pd.DataFrame(row_data)
# 定义信息熵
def calEnt(dataSet):
    n = dataSet.shape[0]  # 数据集总行数
    iset = dataSet.iloc[:, -1].value_counts()  # 统计标签的所有类别
    p = iset / n  # 统计每一类标签所占比
    ent = (-p * np.log2(p)).sum()  # 计算信息熵
    return ent
# 选择最优的列进行切分
def bestSplit(dataSet):
    baseEnt = calEnt(dataSet)  # 计算原始熵
    bestGain = 0  # 初始化信息增益
    axis = -1  # 初始化最佳切分列,标签列
    for i in range(dataSet.shape[1] - 1):  # 对特征的每一列进行循环
        levels = dataSet.iloc[:, i].value_counts().index  # 提取出当前列的所有取值
        ents = 0  # 初始化子节点的信息熵
        for j in levels:  # 对当前列的每一个取值进行循环
            childSet = dataSet[dataSet.iloc[:, i] == j]  # 某一个子节点的dataframe
            ent = calEnt(childSet)  # 计算某一个子节点的信息熵
            ents += (childSet.shape[0] / dataSet.shape[0]) * ent  # 计算当前列的信息熵
        print('第{}列的信息熵为{}'.format(i, ents))
        infoGain = baseEnt - ents  # 计算当前列的信息增益
        print('第{}列的信息增益为{}\n'.format(i, infoGain))
        if (infoGain > bestGain):
            bestGain = infoGain  # 选择最大信息增益
            axis = i  # 最大信息增益所在列的索引
    print("第{}列为最优切分列".format(axis))
    return axis
print(bestSplit(dataSet))

按照给定列切分数据集

def mySplit(dataSet, axis, value):
  """
  函数功能:按照给定的列划分数据集
  参数说明:
  dataSet:原始数据集
  axis:指定的列索引
  value:指定的属性值
  return:redataSet:按照指定列索引和属性值切分后的数据集
  """
  col = dataSet.columns[axis]
  redataSet = dataSet.loc[dataSet[col] == value, :].drop(col, axis=1)
  return redataSet
# 验证函数:以axis=0,value=1为例
print(mySplit(dataSet, 0, 1))


  转载请注明: malred-blog 手写机器学习算法

  目录