本帖最后由 小谢 于 2016-1-17 02:16 编辑
本章概况: 首先介绍了如何求解数据集的最佳分割线,然后实现了一个简化版的SMO算法,简化版算法的运行是没有什么问题的,但是在更大的数据集上运行速度就会变慢,接着又实现了一个完整版的SMO算法,这两个版本中alpha的更改和代数运算的优化环节一模一样,只是在选择alpha的时候方式不同,但是却大大提高了运行速度。然后又采用核函数实现了复杂数据的分类,最后通过手写识别的例子验证了基于核函数的SMO算法。
寻找最大间隔:
B、C、D三条直线都能将数据集分割开,但哪条是最好的呢?引申的问题是:如何求解数据集的最佳分割直线?
在求解数据集的最佳分割直线之前,先来了解几个概念: 1、点到分割面的距离称为间隔 2、支持向量就是离分割超平面最近的那些点 3、分割超平面的形式可以写成
4、如下图,要计算点 A到分割超平面的距离,就必须给出点到分割面的法线或垂线的长度,该值为
利用海维赛德阶跃函数对
作用得到
,其中当u<0时f(u)输出-1,反之则输出+1。
前面提到支持向量就是离分割超平面最近的那些点,也就是最小间隔。对最小间隔最大化可以写作:
上述优化问题中,给定了一些约束条件然后求最优值。这里的约束条件是:
,然后通过拉格朗日乘子法求解,于是优化目标函数可以写成:其约束条件为: 但是这里有个假设,就是数据必须100%线性可分,但我们知道几乎所有数据都不那么干净。这时可以通过松弛变量来允许有些数据点可以处于分割面的错误一端。 此时约束条件为: 常数C用于控制最大化间隔和保证大部分点的函数间隔小于1.0,在优化算法的实现代码中,常数C是一个参数,通过调节该参数得到不同的结果。一旦求出所有的alpha,那么分割超平面就可以通过这些alpha来表达。
SMO算法: SMO表示序列最小化。SMO算法是将最大优化问题分解为多个小优化问题来求解的。这些小优化问题往往很容易求解,并且将它们进行顺序求解的结果与将它们作为整体求解的结果是完全一致的。 SMO的目标是求出一系列alpha和b,一旦求出这些alpha,就很容易求出权重向量w并得到分割超平面。 SMO算法的工作原理是:每次循环中选择两个alpha进行优化处理。一旦找到一对合适的alpha,那么就增大其中一个同时减少另一个。是否合适需要满足两个条件:一,这两个alpha必须要再间隔边界之外,二,这两个alpha还没有进行过区间化处理或者不在边界上。
SMO的代码实现,首先构建一个辅助函数,用于在某个区间范围内随机选择一个整数。 SMO简化版python代码如下: - #函数的作用是打开文件并进行逐行解析,从而得到每行的类标签和整个数据矩阵
- def loadDataSet(fileName):
- dataMat= [];labelMat=[]
- fr = open(fileName)
- for line in fr.readlines():
- lineArr = line.strip().split('\t')
- dataMat.append([float(lineArr[0]),float(lineArr[1])])
- labelMat.append(float(lineArr[2]))
- return dataMat,labelMat
复制代码
- #辅助函数,用于在某个区间范围内随机选择一个整数
- def selectJrand(i,m):
- j=i
- while (j==i):
- j=int(random.uniform(0,m))
- return j
复制代码- #辅助函数,用于调整大于H或小于L的alpha的值
- def clipAlpha(aj,H,L):
- if aj > H:
- aj = H
- if L > aj:
- aj = L
- return aj
复制代码
- #简化版SMO算法
- #终于把这个超长的方法敲完了,下面来好好理解下吧
- #五个参数,分别是:数据集、类别标签、常数C、容错率和退出前最大的循环次数
- def smoSimple(dataMatIn,classLabels,C,toler,maxIter):
- # 转化为numpy矩阵;转置类别标签
- dataMatrix = mat(dataMatIn);labelMat = mat(classLabels).transpose()
- b = 0; m,n = shape(dataMatrix)
- # 构建alpha列矩阵,矩阵中元素都初始化为0
- alphas = mat(zeROS((m,1)))
- iter = 0
- while (iter < maxIter):
- alphaPairsChanged = 0 #该变量用于记录alphas是否已经进行优化
- for i in range(m):
- #计算预测类别fXi
- fXi = float(multiply(alphas,labelMat).T*(dataMatrix*dataMatrix[i,:].T)) + b
- #与真实结果比对,求得误差Ei
- Ei = fXi - float(labelMat)
- #如果误差很大,可以对数据实例所对应的alpha值进行优化
- #不管是正间隔还是负间隔都会被测试,同时检查alphas保证其不能等于0或C(因为后面alpha小于0或大于C时被调整为0或C,所以一旦在该if语句中它们等于这两个值的话,它们就已经在边 界上了,因而不能再减少或增大)
- if((labelMat*Ei < -toler) and (alphas < C)) or ((labelMat*Ei > toler) and (alphas > 0)):
- #利用辅助函数来随机选择第二个alpha值
- j = selectJrand(i,m)
- fXj = float(multiply(alphas,labelMat).T*(dataMatrix*dataMatrix[j,:].T)) + b
- Ej = fXj - float(labelMat[j])
- alphaIold = alphas.copy();
- alphaJold = alphas[j].copy();
- #计算L和H,用于将alpha[j]调整到0到C之间,如果L==H就不做任何改变
- if (labelMat != labelMat[j]):
- L = max(0,alphas[j] - alphas)
- H = min(C,C + alphas[j] - alphas)
- else:
- L = max(0,alphas[j] + alphas - C)
- H = min(C,alphas[j] + alphas)
- if L == H:print "L==H";continue
- #eta是alpha[j]的最优修改量
- eta = 2.0 * dataMatrix[i,:]*dataMatrix[j,:].T - dataMatrix[i,:]*dataMatrix[i,:].T - dataMatrix[j,:]*dataMatrix[j,:].T
- # 如果eta为0则退出当前迭代过程
- if eta >=0: print "eta>=0";continue
- alphas[j] -=labelMat[j]*(Ei - Ej)/eta
- alphas[j] = clipAlpha(alphas[j],H,L)
- if (abs(alphas[j] - alphaJold) <0.00001): print "j not moving enough"; continue
- alphas +=labelMat[j]*labelMat*(alphaJold - alphas[j])
- #对alpha和alpha[j]进行优化之后,给这两个alpha值设置一个常数项b
- b1 = b - Ei - labelMat * (alphas - alphaIold)*dataMatrix[i,:]*dataMatrix[i,:].T - labelMat[j]*(alphas[j]-alphaJold)*dataMatrix[i,:]*dataMatrix[j,:].T
- b2 = b - Ej - labelMat *(alphas-alphaIold)*dataMatrix[i,:]*dataMatrix[j,:].T-labelMat[j]*(alphas[j]-alphaJold)*dataMatrix[j,:]*dataMatrix[j,:].T
- if (0 <alphas) and (C > alphas):b = b1
- elif (0 < alphas[j]) and (C > alphas[j]) : b = b2
- else : b =(b1+b2)/2.0
- alphaPairsChanged +=1
- print "iter : %d i:%d, pairs changed %d"%(iter,i,alphaPairsChanged)
- if (alphaPairsChanged == 0):iter += 1
- else: iter = 0
- print "iteration number: %d"% iter
- return b,alphas
复制代码
运行简化版SMO算法 $ python >>> b,alphas=svmMLiA.smoSimple(dataArr,labelArr,0.6,0.001,40) >>> b >>> alphas[alphas>0] matrix([[ 1.19116082e-01, 2.60208521e-18, 2.28573383e-01, 1.30104261e-18, 3.46288170e-03, 3.44226583e-01]])
得到那些数据点是支持向量 >>> for i in range(100): ... if alphas>0.0:print dataArr,labelArr ... [4.658191, 3.507396] -1.0 [7.286357, 0.251077] 1.0 [3.457096, -0.082216] -1.0 [6.960661, -0.245353] 1.0 [5.286862, -2.358286] 1.0 [6.080573, 0.418886] 1.0
SMO完整版算法python: - #完整版SMO的支持函数
- class optStructk:
- def __init__(self,dataMatIn,classLabels,C,toler):
- self.X = dataMatIn
- self.labelMat=classLabels
- self.C =C
- self.tol = toler
- self.m = shape(dataMatIn)[0]
- self.alphas = mat(zeros((self.m,1)))
- self.b = 0
- #设置误差缓存
- self.eCache = mat(zeros((self.m,2)))
复制代码
- #内循环中的启发式方法
- def calcEkk(oS,k):
- fXk = float(multiply(oS.alphas,oS.labelMat).T*(oS.X*oS.X[k,:].T)) + oS.b
- Ek = fXk - float(oS.labelMat[k])
- return Ek
复制代码
- def selectJk(i,oS,Ei):
- maxK = -1;maxDeltaE = 0;Ej =0
- #将输入值Ei在缓存中设置成为有效的
- oS.eCache = [1,Ei]
- #构建一个非零列表为目录的alpha值,该列表包含以输入列表为目标的列表值
- validEcacheList = nonzero(oS.eCache[:,0].A)[0]
- if (len(validEcacheList))>1:
- for k in validEcacheList:
- if k==i:continue
- Ek = calcEkk(oS,k)
- deltaE = abs(Ei - Ek)
- if (deltaE > maxDeltaE):
- # 选择具有最大步长的j
- maxK=k;maxDeltaE=deltaE;Ej=Ek
- return maxK,Ej
- else:
- j=selectJrand(i,oS.m)
- Ej= calcEkk(oS,j)
- return j,Ej
复制代码
- def updateEkk(oS,k):
- Ek = calcEkk(oS,k)
- oS.eCache[k] = [1,Ek]
复制代码
- # SMO算法中的优化例程
- def innerLk(i,oS):
- Ei=calcEkk(oS,i)
- if((oS.labelMat*Ei < -oS.tol) and (oS.alphas < oS.C)) or ((oS.labelMat*Ei > oS.tol) and (oS.alphas >0)):
- j,Ej = selectJk(i,oS,Ei)
- alphaIold = oS.alphas.copy(); alphaJold = oS.alphas[j].copy();
- if (oS.labelMat != oS.labelMat[j]):
- L = max(0,oS.alphas[j] - oS.alphas)
- H = min(oS.C,oS.C + oS.alphas[j] - oS.alphas)
- else:
- L = max(0,oS.alphas[j]+oS.alphas - oS.C)
- H = min(oS.C,oS.alphas[j] +oS.alphas)
- if L==H:print "L==H"; return 0
- eta = 2.0 * oS.X[i,:]*oS.X[j,:].T - oS.X[i,:]*oS.X[i,:].T - oS.X[j,:]*oS.X[j,:].T
- if eta >=0: print "eta>=0"; return 0
- oS.alphas[j] -=oS.labelMat[j]*(Ei-Ej)/eta
- oS.alphas[j] = clipAlpha(oS.alphas[j],H,L)
- updateEkk(oS,j) #更新误差缓存
- if(abs(oS.alphas[j] - alphaJold) < 0.00001):
- print "j not moving enough"; return 0
- oS.alphas +=oS.labelMat[j]*oS.labelMat*(alphaJold - oS.alphas[j])
- updateEkk(oS,i) #更新误差缓存
- b1 = oS.b - Ei -oS.labelMat*(oS.alphas-alphaIold)*oS.X[i,:]*oS.X[i,:].T -oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.X[i,:]*oS.X[j,:].T
- b2 = oS.b - Ei -oS.labelMat*(oS.alphas-alphaIold)*oS.X[i,:]*oS.X[j,:].T -oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.X[j,:]*oS.X[j,:].T
- if(0 < oS.alphas) and (oS.C > oS.alphas):oS.b = b1
- elif(0 <oS.alphas[j]) and (oS.C > oS.alphas[j]):oS.b =b2
- else : oS.b = (b1+b2)/2.0
- return 1
- else: return 0
复制代码- #SMO完整版
- #完整版与简陋版的区别
- #简化版是通过随机的方式选择alpha对(这种方式虽然也可以工作但效果不如完整版的好),完整办覆盖了整个数据集
- def smoPk(dataMatIn,classLabels,C,toler,maxIter,kTup=('lin',0)):
- oS = optStructk(mat(dataMatIn),mat(classLabels).transpose(),C,toler)
- iter =0
- entireSet = True; alphaPairsChanged = 0
- while (iter < maxIter) and ((alphaPairsChanged>0) or (entireSet)):
- alphaPairsChanged = 0
- if entireSet:
- for i in range(oS.m):
- alphaPairsChanged +=innerLk(i,oS)
- print "fullSet, iter: %d i:%d,pairs changed %d"%(iter,i,alphaPairsChanged)
- iter +=1
- else :
- nonBoundIs = nonzero((oS.alphas.A>0) * (oS.alphas.A < C))[0]
- for i in nonBoundIs:
- alphaPairsChanged += innerLk(i,oS)
- print "non-bound,iter:%d i %d,pairs changed %d" %(iter,i,alphaPairsChanged)
- iter +=1
- if entireSet : entireSet =False
- elif (alphaPairsChanged == 0) : entireSet =True
- print "iteration number:%d" % iter
- return oS.b,oS.alphas
复制代码
测试分类器效果 >>> ws=svmMLiA.calcWs(laphas,dataArr,labelArr) >>> ws array([[ 0.7282993 ], [-0.15563093]])
对第一个数据分类,如果该值的结果大于0,那么其属于1类,如果该值小于0,那么属于-1类,对于数据点0,应该得到的类别标签是-1 >>> from numpy import * >>> dataMat = mat(dataArr) >>> dataMat[0]*mat(ws) +b matrix([[-0.92555695]])
验证结果的正确性: >>> labelArr[0] -1.0
核函数: 利用核函数将如下非线性可分的数据转换成分类器理解的形式。对圆中的数据进行某种形式的转换,从而得到某些新的变量来表示数据,这种表示更容易得到大于0或小于0的测试结果。这就需要将数据从一种特征空间转换到另一种特征空间即映射。 SVM优化一个特别好的地方就是,所有的运算都可以写成内积内积的形式,可以把内积替换成核函数。
python代码如下: - #核转换函数
- #X描述所有核函数类型的一个字符串
- def kernelTrans(X,A,kTup):
- m,n = shape(X)
- K = mat(zeros((m,1)))
- if kTup[0]=='lin' : K=X * A.T
- elif kTup[0]=='rbf':
- for j in range(m):
- deltaRow = X[j,:] - A
- K[j] = deltaRow * deltaRow.T
- K = exp(K /(-1*kTup[1]**2))
- else : raise NameError('Houston We have a Problem -- That Kernel is not recognized')
- return K
复制代码
- class optStruct:
- # kTup是一个包含核函数信息的元组
- def __init__(self,dataMatIn,classLabels,C,toler,kTup):
- self.X=dataMatIn
- self.labelMat = classLabels
- self.C = C
- self.tol = toler
- self.m = shape(dataMatIn)[0]
- self.alphas = mat(zeros((self.m,1)))
- self.b = 0
- self.eCache = mat(zeros((self.m,2)))
- self.K = mat(zeros((self.m,self.m)))
- for i in range(self.m):
- self.K[:,i] = kernelTrans(self.X,self.X[i,:],kTup)
复制代码
- #核函数内循环中的启发式方法
- def calcEk(oS,k):
- fXk = float(multiply(oS.alphas,oS.labelMat).T*oS.K[:,k] + oS.b)
- Ek = fXk - float(oS.labelMat[k])
- return Ek
复制代码
- def selectJ(i,oS,Ei):
- maxK = -1;maxDeltaE = 0;Ej =0
- #将输入值Ei在缓存中设置成为有效的
- oS.eCache[i] = [1,Ei]
- #构建一个非零列表为目录的alpha值,该列表包含以输入列表为目标的列表值
- validEcacheList = nonzero(oS.eCache[:,0].A)[0]
- if (len(validEcacheList))>1:
- for k in validEcacheList:
- if k==i:continue
- Ek = calcEk(oS,k)
- deltaE = abs(Ei - Ek)
- if (deltaE > maxDeltaE):
- # 选择具有最大步长的j
- maxK=k;maxDeltaE=deltaE;Ej=Ek
- return maxK,Ej
- else:
- j=selectJrand(i,oS.m)
- Ej= calcEk(oS,j)
- return j,Ej
复制代码- def updateEk(oS,k):
- Ek = calcEk(oS,k)
- oS.eCache[k] = [1,Ek]
复制代码- # SMO算法中的优化例程
- def innerL(i,oS):
- Ei=calcEk(oS,i)
- if((oS.labelMat[i]*Ei < -oS.tol) and (oS.alphas[i] < oS.C)) or ((oS.labelMat[i]*Ei > oS.tol) and (oS.alphas[i] >0)):
- j,Ej = selectJ(i,oS,Ei)
- alphaIold = oS.alphas[i].copy(); alphaJold = oS.alphas[j].copy();
- if (oS.labelMat[i] != oS.labelMat[j]):
- L = max(0,oS.alphas[j] - oS.alphas[i])
- H = min(oS.C,oS.C + oS.alphas[j] - oS.alphas[i])
- else:
- L = max(0,oS.alphas[j]+oS.alphas[i] - oS.C)
- H = min(oS.C,oS.alphas[j] +oS.alphas[i])
- if L==H:print "L==H"; return 0
- eta = 2.0 * oS.K[i,j] -oS.K[i,i] - oS.K[j,j]
- if eta >=0: print "eta>=0"; return 0
- oS.alphas[j] -=oS.labelMat[j]*(Ei-Ej)/eta
- oS.alphas[j] = clipAlpha(oS.alphas[j],H,L)
- updateEk(oS,j) #更新误差缓存
- if(abs(oS.alphas[j] - alphaJold) < 0.00001):
- print "j not moving enough"; return 0
- oS.alphas[i] +=oS.labelMat[j]*oS.labelMat[i]*(alphaJold - oS.alphas[j])
- updateEk(oS,i) #更新误差缓存
- b1 = oS.b - Ei -oS.labelMat[i]*(oS.alphas[i] - alphaIold)*oS.K[i,i] - oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.K[i,j]
- b2 = oS.b - Ei -oS.labelMat[i]*(oS.alphas[i] - alphaIold)*oS.K[i,j] - oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.K[j,j]
- if(0 < oS.alphas[i]) and (oS.C > oS.alphas[i]):oS.b = b1
- elif(0 <oS.alphas[j]) and (oS.C > oS.alphas[j]):oS.b =b2
- else : oS.b = (b1+b2)/2.0
- return 1
- else: return 0
复制代码- #SMO核函数
- def smoP(dataMatIn,classLabels,C,toler,maxIter,kTup=('lin',0)):
- oS = optStruct(mat(dataMatIn),mat(classLabels).transpose(),C,toler,kTup)
- iter =0
- entireSet = True; alphaPairsChanged = 0
- while (iter < maxIter) and ((alphaPairsChanged>0) or (entireSet)):
- alphaPairsChanged = 0
- if entireSet:
- for i in range(oS.m):
- alphaPairsChanged +=innerL(i,oS)
- print "fullSet, iter: %d i:%d,pairs changed %d"%(iter,i,alphaPairsChanged)
- iter +=1
- else :
- nonBoundIs = nonzero((oS.alphas.A>0) * (oS.alphas.A < C))[0]
- for i in nonBoundIs:
- alphaPairsChanged += innerL(i,oS)
- print "non-bound,iter:%d i %d,pairs changed %d" %(iter,i,alphaPairsChanged)
- iter +=1
- if entireSet : entireSet =False
- elif (alphaPairsChanged == 0) : entireSet =True
- print "iteration number:%d" % iter
- return oS.b,oS.alphas
复制代码- # 计算W
- def calcWs(alphas,dataArr,classLabels):
- X = mat(dataArr);labelMat = mat(classLabels).transpose()
- m,n = shape(X)
- w = zeros((n,1))
- for i in range(m):
- w += multiply(alphas[i]*labelMat[i],X[i,:].T)
- return w
复制代码- #核函数测试方法
- def testRbf(k1=1.3):
- dataArr,labelArr = loadDataSet('testSetRBF.txt')
- b,alphas = smoP(dataArr,labelArr,200,0.0001,10000,('rbf',k1))
- datMat=mat(dataArr);labelMat = mat(labelArr).transpose()
- svInd=nonzero(alphas.A>0)[0]
- sVs = datMat[svInd]
- labelSV=labelMat[svInd];
- print "there are %d Support Vectors "% shape(sVs)[0]
- m,n=shape(datMat)
- errorCount = 0
- for i in range(m):
- # 对数据进行核转换
- kernelEval = kernelTrans(sVs,datMat[i,:],('rbf',k1))
- #核转换后的数据与前面的alpha及类别标签值求积
- predict=kernelEval.T * multiply(labelSV,alphas[svInd]) + b
- if sign(predict)!=sign(labelArr[i]):errorCount +=1
- print "the training error rate is: %f" % (float(errorCount)/m)
- dataArr,labelArr = loadDataSet('testSetRBF2.txt')
- errorCount = 0
- datMat=mat(dataArr);labelMat = mat(labelArr).transpose()
- m,n = shape(datMat)
- for i in range(m):
- kernelEval = kernelTrans(sVs,datMat[i,:],('rbf',k1))
- predict = kernelEval.T * multiply(labelSV,alphas[svInd]) +b
- if sign(predict) != sign(labelArr[i]) : errorCount +=1
- print "the test error rate is:%f"%(float(errorCount)/m)
复制代码
测试核函数 >>> svmMLiA.testRbf()
手写识别问题回顾:
python代码如下:
- # 手写识别问题回顾
- def img2vector(filename):
- returnVect = zeros((1,1024))
- # print("filename",filename)
- fr = open(filename)
- for i in range(32):
- lineStr=fr.readline()
- for j in range(32):
- returnVect[0,32*i+j] = int(lineStr[j])
- return returnVect
复制代码
- def loadImages(dirName):
- from os import listdir
- hwLabels = []
- trainingFileList = listdir(dirName)
- m = len(trainingFileList)
- trainingMat = zeros((m,1024))
- for i in range(m):
- fileNameStr = trainingFileList[i]
- fileStr = fileNameStr.split('.')[0]
- classNumStr = int(fileStr.split('_')[0])
- if classNumStr == 9 : hwLabels.append(-1)
- else:hwLabels.append(1)
- trainingMat[i,:] = img2vector('%s/%s' % (dirName,fileNameStr))
- return trainingMat,hwLabels
复制代码
- def testDigits(kTup=('rbf',10)):
- dataArr,labelArr = loadImages('trainingDigits')
- b,alphas= smoPK(dataArr,labelArr,200,0.0001,10000,kTup)
- datMat=mat(dataArr);labelMat=mat(labelArr).transpose()
- svInd=nonzero(alphas.A>0)[0]
- sVs=datMat[svInd]
- labelSV = labelMat[svInd];
- print "there are %d Support Vectors"%shape(sVs)[0]
- m,n = shape(datMat)
- errorCount = 0
- for i in range(m):
- kernelEval = kernelTrans(sVs,datMat[i,:],kTup)
- predict=kernelEval.T * multiply(labelSV,alphas[svInd]) + b
- if sign(predict) !=sign(labelArr[i]):errorCount +=1
- print "the training error rate is: %f" %(float(errorCount)/m)
- dataArr,labelArr = loadImages('testDigits')
- errorCount = 0
- datMat = mat(dataArr);labelMat = mat(labelArr).transpose()
- m,n = shape(datMat)
- for i in range(m):
- kernelEval = kernelTrans(sVs,datMat[i,:],kTup)
- predict=kernelEval.T * multiply(labelSV,alphas[svInd]) + b
- if sign(predict) != sign(labelArr[i]):errorCount +=1
- print "the test error rate is: %f" %(float(errorCount)/m)
复制代码
运行代码: >>> svmMLiA.testDigits(('rbf',20)) |