import math | |
import numpy as np | |
import torch | |
from torch import nn | |
from d2l import torch as d2l |
- 生成数据集 (噪声项服从均值为 0,标准差为 0.1 的正态分布)
# 多项式的最大阶数 | |
max_degree = 20 | |
# 训练和测试数据集的大小 | |
n_train,n_test = 100,100 | |
# 分配大量空间 | |
true_w = np.zeros(max_degree) | |
# 只有前四位作为多项式模型的真实权重 | |
true_w[0:4] = np.array([5,1.2,-3.4,5.6]) | |
# 生成样本特征 shape(n_train+n_test,1), 均值为 0 方差为 1 | |
features = np.random.normal(size=(n_train+n_test,1)) | |
# 打乱特征顺序 | |
np.random.shuffle(features) | |
# 生成 [0,1,2,3...19]==> 二维数组, | |
# [[0,0^1,0^2,....0^19], | |
# [1,1^1,1^2,....1^19], | |
# [2,2^1,........2^19], | |
# ... | |
# [199,199^1,199^2.....199^19]] | |
# 每一列都是 features 的一个幂次 | |
poly_features = np.power(features,np.arange(max_degree).reshape(1,-1)) | |
# 对每一列进行归一化 | |
for i in range(max_degree): | |
poly_features[:,i] /= math.gamma(i+1) # gamma(i) = (i-1)! | |
# (200,20)* (20,1)==> (200,1) | |
labels = np.dot(poly_features,true_w) | |
# 添加标准差为 0.1 的正态分布噪声 | |
labels += np.random.normal(scale=0.1,size=labels.shape) |
- 对模型进行训练和测试
class Accumulator: | |
def __init__(self,n): | |
self.data = [0.0]*n | |
def add(self,*args): | |
self.data = [float(a) + b for a,b in zip(args,self.data)] | |
def reset(self): | |
self.data = [0.0]*len(self.data) | |
def __getitem__(self,idx): | |
return self.data[idx] | |
def evaluate_loss(net, data_iter, loss): | |
"""评估给定数据模型的损失""" | |
metric = d2l.Accumulator(2) # 损失的总和,样本数量 | |
for X, y in data_iter: | |
out = net(X) | |
print("Out: ",out) | |
y = y.reshape(out.shape) | |
print("y: ",y) | |
# 返回包含每个样本损失的张量 | |
l = loss(out,y) | |
print("loss",l) | |
# 将每批数据的损失总和和样本数量添加到 metric | |
metric.add(l.sum(),l.numel()) | |
# 返回样本平均损失 | |
return float(metric[0] / metric[1]) |
true_w, features, poly_features,labels = [torch.tensor(x,dtype=torch.float) for x in [true_w,features,poly_features,labels]] | |
features[:2],poly_features[:2,],labels[:2] |
def accuracy(y_hat,y): | |
"""计算预测正确的数量""" | |
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: | |
y_hat = y_hat.argmax(1) | |
cmp = y_hat.type(y.dtype) == y | |
return float(cmp.type(y.dtype).sum()) | |
def train_epoch_ch3(net, data_iter, loss, updater): | |
if isinstance(net, torch.nn.Module): | |
net.train() | |
metric = Accumulator(3) | |
for X, y in data_iter: | |
out = net(X) | |
l = loss(out,y) | |
if isinstance(updater, torch.optim.Optimizer): | |
updater.zero_grad() | |
l.mean().backward() | |
updater.step() | |
else: | |
l.sum().backward() | |
updater(X.shape[0]) | |
metric.add(float(l.sum()),accuracy(out,y),y.numel()) | |
print(y.numel()) | |
return metric[0] / metric[2],metric[1] / metric[2] | |
def train(train_features,test_features,train_labels,test_labels,num_epochs=400): | |
loss = nn.MSELoss(reduction='none') | |
input_shape = train_features.shape[-1] | |
net = nn.Sequential(nn.Linear(input_shape,1,bias=False)) | |
batch_size = min(10,train_labels.shape[0]) | |
train_iter = d2l.load_array((train_features,train_labels.reshape(-1,1)),batch_size) | |
test_iter = d2l.load_array((test_features,test_labels.reshape(-1,1)),batch_size,is_train=False) | |
trainer = torch.optim.SGD(net.parameters(),lr=0.01) | |
animator = d2l.Animator(xlabel='epoch',ylabel='loss',yscale='log',xlim=[1,num_epochs],ylim=[1e-3,1e2],legend=['train','test']) | |
for epoch in range(num_epochs): | |
train_epoch_ch3(net,train_iter,loss,trainer) | |
if epoch == 0 or (epoch+1) % 20 == 0: | |
print("epoch: ",epoch) | |
print(evaluate_loss(net,train_iter,loss)) | |
animator.add(epoch + 1,(evaluate_loss(net,train_iter,loss),evaluate_loss(net,test_iter,loss))) | |
print('weight: ', net[0].weight.data.numpy()) |
- 三阶多项式函数拟合 (正常)
# 选择前四个特征维度 | |
train(poly_features[:n_train,:4],poly_features[n_train:,:4], | |
labels[:n_train],labels[n_train:],num_epochs=400) |
- 线性函数(欠拟合)
train(poly_features[:n_train,:2],poly_features[n_train:,:2],labels[:n_train],labels[n_train:],num_epochs=400) |
- 高阶多项式函数拟合(过拟合)
train(poly_features[:n_train,:],poly_features[n_train:,:],labels[:n_train],labels[n_train:],num_epochs=1500) |