# SVM based on numpy
# by WangYIzhang
# refering https://jonchar.net/notebooks/SVM/
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs, make_circles, make_moons
from sklearn.preprocessing import StandardScaler
class SMOModel:
'''
Container object for the model used for sequential minimal optimization.
'''
def __init__(self, X, y, C, kernel, alphas, b, errors) -> None:
self.X = X # 输入数据
self.y = y # 标签
self.C = C
self.kernel = kernel
self.alphas = alphas
self.b = b
self.errors = errors
self._obj = [] # 目标函数值
self.m = len(self.X)
# kernel
def linear_kernel(x, y, b=1):
'''
返回线性核 K(x,y)=x.T@z+b 作用后的结果
Args:
x,y: 输入向量
b: 偏置,默认值为1
'''
return x @ y.T + b
def gaussian_kernel(x, y, sigma=1):
'''
返回高斯核 K(x,z)=exp(-||x-z||^2 / 2*σ^2) 作用后的结果
Args:
x,y: 输入向量
b: 偏置,默认值为1
'''
if np.ndim(x) == 1 and np.ndim(y) == 1:
result = np.exp(- (np.linalg.norm(x - y, 2)) ** 2 / (2 * sigma ** 2))
elif (np.ndim(x) > 1 and np.ndim(y) == 1) or (np.ndim(x) == 1 and np.ndim(y) > 1):
result = np.exp(- (np.linalg.norm(x - y, 2, axis=1) ** 2) / (2 * sigma ** 2))
elif np.ndim(x) > 1 and np.ndim(y) > 1:
result = np.exp(- (np.linalg.norm(x[:, np.newaxis] - y[np.newaxis, :], 2, axis=2) ** 2) / (2 * sigma ** 2))
return result
# # 测试核函数
# x_len, y_len = 5, 10
# print(linear_kernel(np.random.randn(x_len, 1), np.random.randn(y_len, 1)).shape == (x_len, y_len))
# print(gaussian_kernel(np.random.randn(x_len, 1), np.random.randn(y_len, 1)).shape == (x_len, y_len))
# 目标函数
def objective_function(alphas, target, kernel, X_train):
'''
返回SVM的目标函数
Args:
alphas: np.array, 拉格朗日乘子
target: 标签
kernel: 核函数
X_train: 输入数据
'''
return np.sum(alphas) - 0.5 * np.sum((target[:, None] * target[None, :]) * kernel(X_train, X_train) * (alphas[:, None] * alphas[None, :])) # numpy 索引中的None作用是增加一个维度,与np.newaxis相同
# 决策函数
def decision_function(alphas, target, kernel, X_train, X_test, b):
'''
预测X_test的标签
Args:
alphas: np.array, 拉格朗日乘子
target: 标签
kernel: 核函数
X_train: 输入数据
X_test: 测试数据
b: 偏置
'''
return np.sum(alphas * target @ kernel(X_train, X_test) + b)
# 画出决策边界
def plot_decision_boundary(model, ax, resolution=100, colors=('b', 'k', 'r'), levels=(-1, 0, 1)):
"""Plots the model's decision boundary on the input axes object.
Range of decision boundary grid is determined by the training data.
Returns decision boundary grid and axes object (`grid`, `ax`)."""
# Generate coordinate grid of shape [resolution x resolution]
# and evaluate the model over the entire space
xrange = np.linspace(model.X[:,0].min(), model.X[:,0].max(), resolution)
yrange = np.linspace(model.X[:,1].min(), model.X[:,1].max(), resolution)
grid = [[decision_function(model.alphas, model.y,
model.kernel, model.X,
np.array([xr, yr]), model.b) for xr in xrange] for yr in yrange]
grid = np.array(grid).reshape(len(xrange), len(yrange))
# Plot decision contours using grid and
# make a scatter plot of training data
ax.contour(xrange, yrange, grid, levels=levels, linewidths=(1, 1, 1),
linestyles=('--', '-', '--'), colors=colors)
ax.scatter(model.X[:,0], model.X[:,1],
c=model.y, cmap=plt.cm.viridis, lw=0, alpha=0.25)
# Plot support vectors (non-zero alphas)
# as circled points (linewidth > 0)
mask = np.round(model.alphas, decimals=2) != 0.0
ax.scatter(model.X[mask,0], model.X[mask,1],
c=model.y[mask], cmap=plt.cm.viridis, lw=1, edgecolors='k')
return grid, ax
'''
SMO算法
使用以下三个函数take_step()、examine_example()、train()来训练模型,其中:
1. train() 函数利用启发式的方法选择要优化的第一个 α,并将该值传递给 Exam_example()。
2. Exam_example() 利用启发式的方法选择第二个 α 进行优化,并将两个 α 值的索引传递给 take_step()。
3. take_step() 执行计算,得到两个新的 α 值,一个新的阈值 b ,并更新错误缓存error cache。
train() 函数使用 while 循环以几种不同的方式遍历 α 值,直到无法进行更多优化,此时它返回优化的 α 向量(嵌入在 SMOModel 对象中)。
'''
def take_step(i1, i2, model):
# Skip if chosen alphas are the same
if i1 == i2:
return 0, model
alph1 = model.alphas[i1]
alph2 = model.alphas[i2]
y1 = model.y[i1]
y2 = model.y[i2]
E1 = model.errors[i1]
E2 = model.errors[i2]
s = y1 * y2
# Compute L & H, the bounds on new possible alpha values
if (y1 != y2):
L = max(0, alph2 - alph1)
H = min(model.C, model.C + alph2 - alph1)
elif (y1 == y2):
L = max(0, alph1 + alph2 - model.C)
H = min(model.C, alph1 + alph2)
if (L == H):
return 0, model
# Compute kernel & 2nd derivative eta
k11 = model.kernel(model.X[i1], model.X[i1])
k12 = model.kernel(model.X[i1], model.X[i2])
k22 = model.kernel(model.X[i2], model.X[i2])
eta = 2 * k12 - k11 - k22
# Compute new alpha 2 (a2) if eta is negative
if (eta < 0):
a2 = alph2 - y2 * (E1 - E2) / eta
# Clip a2 based on bounds L & H
if L < a2 < H:
a2 = a2
elif (a2 <= L):
a2 = L
elif (a2 >= H):
a2 = H
# If eta is non-negative, move new a2 to bound with greater objective function value
else:
alphas_adj = model.alphas.copy()
alphas_adj[i2] = L
# objective function output with a2 = L
Lobj = objective_function(alphas_adj, model.y, model.kernel, model.X)
alphas_adj[i2] = H
# objective function output with a2 = H
Hobj = objective_function(alphas_adj, model.y, model.kernel, model.X)
if Lobj > (Hobj + eps):
a2 = L
elif Lobj < (Hobj - eps):
a2 = H
else:
a2 = alph2
# Push a2 to 0 or C if very close
if a2 < 1e-8:
a2 = 0.0
elif a2 > (model.C - 1e-8):
a2 = model.C
# If examples can't be optimized within epsilon (eps), skip this pair
if (np.abs(a2 - alph2) < eps * (a2 + alph2 + eps)):
return 0, model
# Calculate new alpha 1 (a1)
a1 = alph1 + s * (alph2 - a2)
# Update threshold b to reflect newly calculated alphas
# Calculate both possible thresholds
b1 = E1 + y1 * (a1 - alph1) * k11 + y2 * (a2 - alph2) * k12 + model.b
b2 = E2 + y1 * (a1 - alph1) * k12 + y2 * (a2 - alph2) * k22 + model.b
# Set new threshold based on if a1 or a2 is bound by L and/or H
if 0 < a1 and a1 < C:
b_new = b1
elif 0 < a2 and a2 < C:
b_new = b2
# Average thresholds if both are bound
else:
b_new = (b1 + b2) * 0.5
# Update model object with new alphas & threshold
model.alphas[i1] = a1
model.alphas[i2] = a2
# Update error cache
# Error cache for optimized alphas is set to 0 if they're unbound
for index, alph in zip([i1, i2], [a1, a2]):
if 0.0 < alph < model.C:
model.errors[index] = 0.0
# Set non-optimized errors based on equation 12.11 in Platt's book
non_opt = [n for n in range(model.m) if (n != i1 and n != i2)]
model.errors[non_opt] = model.errors[n