"""
scikit-learn 패키지를 이용한 kNN(k Nearest Neighbor: 최근접 이웃)
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report
# 1. 데이터 준비
col_names = ['sepal-length', 'sepal-width',
'petal-length', 'petal-width',
'Class']
# csv 파일에서 DataFrame을 생성
dataset = pd.read_csv('iris.csv', encoding='UTF-8', header=None, names=col_names)
# DataFrame 확인
print(dataset.shape) # (row개수, column개수)
print(dataset.info()) # 데이터 타입, row 개수, column 개수, 컬럼 데이터 타입
print(dataset.describe()) # 요약 통계 정보
print(dataset.head())
print(dataset.iloc[0:5]) # dataset.head()
print(dataset.iloc[-5:]) # dataset.tail()
# 데이터 전처리:
# 데이터 세트를 데이터(포인트)와 레이블로 구분
# X = 전체 행, 마지막 열 제외한 모든 열 데이터 -> n차원 공간의 포인트
X = dataset.iloc[:, :-1].to_numpy() # DataFrame을 np.ndarray로 변환
# print(X)
# y = 전체 행, 마지막 열 데이터
y = dataset.iloc[:, 4].to_numpy()
print(y)
# 전체 데이터 세트를 학습 세트(training set)와 검증 세트(test set)로 나눔
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print(len(X_train), len(X_test))
print(X_train[:3])
print(y_train[:3])
# 3. 거리 계산을 위해서 각 특성들을 스케일링(표준화)
# Z-score 표준화: 평균을 0, 표준편차 1로 변환
scaler = StandardScaler() # Scaler 객체 생성
scaler.fit(X_train) # 스케일링(표준화)를 위한 평균과 표준 편차 계산
X_train = scaler.transform(X_train) # 스케일링(표준화 수행)
X_test = scaler.transform(X_test)
# 스케일링(z-score 표준화 수행 결과 확인)
for col in range(4):
print(f'평균 = {X_train[:, col].mean()}, 표준편차= {X_train[:, col].std()}')
for col in range(4):
print(f'평균 = {X_test[:, col].mean()}, 표준편차= {X_test[:, col].std()}')
# 4. 학습/예측(Training/Pradiction)
# k-NN 분류기를 생성
classifier = KNeighborsClassifier(n_neighbors=5)
# 분류기 학습
classifier.fit(X_train, y_train)
# 예측
y_pred = classifier.predict(X_test)
print(y_pred)
# 5. 모델 평가
conf_matrix = confusion_matrix(y_test, y_pred)
print(conf_matrix)
report = classification_report(y_test, y_pred)
print(report)
# 정확도(accuracy) = (전체 정답 수) / (전체 문제 수)
# 6. 모델 개선 - k값을 변화시킬 때, 에러가 줄어드는 지
errors = []
for i in range(1, 31):
knn = KNeighborsClassifier(n_neighbors=i)
knn.fit(X_train, y_train)
pred_i = knn.predict(X_test)
errors.append(np.mean(pred_i != y_test))
print(errors)
plt.plot(range(1, 31), errors, marker='o')
plt.title('Mean error with K-Value')
plt.xlabel('k-value')
plt.ylabel('mean error')
plt.show()
위를 참고한 함수를 직접 구현
import numpy as np
def train_test_split(X, y, test_size):
"""
len(X) == len(y),
test_size : 0.0 ~ 1.0 사이의 수
:param X: numpy.ndarray. n * m
:param y: numpy.ndarray. 원소의 개수가 n개인 1차원 배열
:test_size: 전체 데이터 대비 test sample 크기의 비율
:return: X_train, X_test, y_train, y_test
"""
# 인덱스를 저장하는 배열
length = len(X)
indices = np.array([i for i in range(length)])
print('shuffle 전: ', indices)
# 인덱스를 임의로 섞음
np.random.shuffle(indices)
print('shuffle 후: ', indices)
cut = int(length * (1 - test_size))
X_train = X[indices[:cut]] # Train set points
y_train = y[indices[:cut]] # Train set labels
X_test = X[indices[cut:]] # Test set points
y_test = y[indices[cut:]] # Test set labels
return X_train, X_test, y_train, y_test
np.random.seed(1210)
X = np.random.randint(10, size=(5, 2))
print(X)
y = np.array(['a', 'b', 'a', 'b', 'a']) # labels
print(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print(X_train)
print(y_train)
print(X_test)
print(y_test)
[[6 7]
[8 9]
[4 2]
[8 2]
[0 2]]
['a' 'b' 'a' 'b' 'a']
shuffle 전: [0 1 2 3 4]
shuffle 후: [0 3 1 2 4]
[[6 7]
[8 2]
[8 9]
[4 2]]
['a' 'b' 'b' 'a']
[[0 2]]
['a']
class MyScaler:
def fit(self, X):
"""X의 각 특성(컬럼)들의 평균과 표준 편차를 저장"""
# 컬럼별로 평균을 계산해서 저장
self.feature_means = np.mean(X, axis=0) # axis=0: 컬럼별 계산
# 컬럼별로 표준 편차를 계산해서 저장
self.feature_stds = np.std(X, axis=0)
print(self.feature_mens)
print(self.feature_stds)
def transform(self, X):
"""X의 평균을 0, 표준 편차를 1로 변환해서 리턴"""
# X와 같은 크기의 비어있는 배열을 만든다.
dim = X.shape
transformed = np.empty(dim)
for row in range(dim[0]): # row 개수만큼 반복
for col in range(dim[1]): # column 개수만큼 반복
# x_new = (x - mean) / std
transformed[row, col] = (X[row, col] - self.feature_means[col]) / \
self.feature_stds[col]
return transformed
scaler = MyScaler() # 객체 생성
scaler.fit(X_train) # 객체가 가지고 있는 메소드 호출
X_train_scaled = scaler.transform(X_train)
print(X_train_scaled)
X_test_scaled = scaler.transform(X_test)
print(X_test_scaled)
[6.5 5. ]
[1.6583124 3.082207 ]
[[-0.30151134 0.64888568]
[ 0.90453403 -0.97332853]
[ 0.90453403 1.29777137]
[-1.50755672 -0.97332853]]
[[-3.91964748 -0.97332853]]
class MyKnnClassifier:
def __init__(self, n_neighbors=5): # 객체 생성
"""최근접 이웃으로 선택할 개수를 저장함."""
self.k = n_neighbors
def fit(self, X_train, y_label): # 모델 훈련
"""레이블을 가지고 있는 데이터(point)를 저장함."""
self.points = X_train
self.labels = y_label
def predict(self, X_test): # 예측
"""테스트 세트 X_test의 각 점들마다,
1) 학습 세트에 있는 모든 점들과의 거리를 계산.
2) 계산된 거리들 중에서 가장 짧은 거리 k개를 선택.
3) k개 선택된 레이블들 중에서 가장 많은 것(다수결)을 예측값으로 함."""
predicts = [] # 예측값들을 저장할 리스트
for test_pt in X_test: # 테스트 세트에 있는 점들의 개수만큼 반복
# 학습 세트의 점들과의 거리를 계산
distances = self.distance(self.points, test_pt)
print(test_pt)
print(distances)
# 다수결로 예측값 결정
# winner = majority_vote(distances)
# 예측값을 리스트에 저장
predicts.append(winner)
return np.array(predicts) # 예측값들의 배열을 리턴
.
def distance(self, X, y):
"""점(벡터) y와 점(벡터)들 X의 거리들의 배열을 리턴"""
print('print distance:', np.sqrt(np.sum((X - y) ** 2, axis=1)))
return np.sqrt(np.sum((X - y) ** 2, axis=1))
print distance: [3.96515927 4.82418151 5.33203732 2.41209076]
knn = MyKnnClassifier(n_neighbors=3) # k-NN 분류기 객체 생성 - 생성자 호출
print('k =', knn.k)
knn.fit(X_train_scaled, y_train)
knn.predict(X_test_scaled)
k = 3
[-3.91964748 -0.97332853]
[3.96515927 4.82418151 5.33203732 2.41209076]
def majority_vote(self, distances):
# 거리 순서로 정렬된 인덱스를 찾는다. -> np.argsort 함수
indices_by_distance = np.argsort(distances)
print('indices_by_distance:', indices_by_distance)
# 가장 가까운 k개의 레이블을 찾는다
k_nearest_neighbor = []
# for i in range(self.k):
# idx = indices_by_distance[i]
# k_nearest_neighbor.append(self.labels[idx])
for i in indices_by_distance[0:self.k]:
k_nearest_neighbor.append(self.labels[i])
print('k_nearest_neighbor: ',k_nearest_neighbor)
# 가장 많은 득표를 얻은 레이블을 찾는다.
vote_counts = Counter(k_nearest_neighbor)
print('vote_counts: ', vote_counts)
# most_common(n): 가장 많은 빈도수 n순위까지의 리스트
# 빈도수가 동률일 수도 있다
print('vote_counts.most_common(2): ', vote_counts.most_common(2)) # 2등까지 출력
winner, winner_count = vote_counts.most_common(1)[0]
return winner
k_nearest_neighbor: ['a', 'a', 'b']
vote_counts: Counter({'a': 2, 'b': 1})
vote_counts.most_common(2): [('a', 2), ('b', 1)]
knn = MyKnnClassifier(n_neighbors=3) # k-NN 분류기 객체 생성 - 생성자 호출
print('k =', knn.k)
knn.fit(X_train_scaled, y_train)
y_pred = knn.predict(X_test_scaled)
print('y_pred: ',y_pred)
y_pred: ['a']