Spark & 머신 러닝 - Anomaly Detection

이 포스트는 K-means Clustering을 이용하여 네트워크 트래픽에서의 비정상 트래픽을 감지해 내는 과정에 대한 내용을 담고 있다. 이 장에서 수행한 결과는 수행시마다 바뀌기 때문에, 수행 결과가 이 문서에서 제시하는 결과와 완벽히 일치하지 않는다.

이 포스트는 Advanced Analytics with Spark을 정리한 글이다.

Unsupervised Learning and Anomaly Detection

앞선 챕터에서 살펴본 Classification과 Regression은 강력하기도 하며 많은 연구가 진행된 머신 러닝 테크닉중 하나이다. 하지만 Decision Tree나, Random Dicision Forest등을 이용하여 새롭게 들어온 데이터를 이용해 어떤 값을 예측을 할 때는 그 값이 이전 훈련 과정에서 이미 알고 있는 값 중 하나여야만 했다. 그리고 이러한 과정이 Supervised Learning의 일반적인 방식이라 설명하였다.

하지만 다음과 같은 경우가 있을 수 있다. 어떤 인터넷 쇼핑몰 사이트의 회원을 그들의 쇼핑 습관과 관심 상품등을 기준으로 구분하고자 한다. 이 때의 입력 Feature는 사용자들의 구매 목록, 클릭한 내용들, 통계학적 데이터와 같은 것들이 될 수 있다. 이러한 입력 데이터들의 결과로 아웃풋은 사용자들의 그룹이 되어야 하는데, 예를 들면 한 그룹은 패션에 민감한 사람들의 그룹 또 다른 그룹은 가격에 민감한 사람들의 그룹과 같은 형태를 나타내어야 한다.

만약 Decision Tree와 같은 Supervised Learning 테크닉만 사용하여 위 문제를 해결하려 한다면 새로운 데이터를 각각 Classifier에 적용시켜야 하는데 어떤 데이터가 어떤 값을 가져야 하는지에 대한 사전정보가 전혀 없기 때문에 불가능에 가깝다. 떄문에 이런 문제를 해결할 때에는 Supervised Learning이 아닌 Unsupervised Learning을 이용하여 해결해야 한다. Unsupervised Learning은 Supervised Learning과 다르게 어떤 값을 예측해야 한다고 하여 사전에 그 값이 어떤 것인지 트레이닝 시킬 필요가 없다. 왜냐하면 Unsupervised Learning에 속하는 방법들은 주어진 데이터들 사이에서 비슷한 것들끼리 그룹을 만들고, 새로운 데이터가 어떤 그룹에 속하는지 판단하는 역할을 하기 때문이다.

Anomaly Detection은 이름에서도 알 수 있듯이 비정상적인 것을 찾는 것을 의미한다. Anomaly Detection은 네트워크 공격 혹은 서버에서의 문제 발생등을 검출하는데에 이용된다. 여기서 중요한 것은 새로운 형태의 공격이라던가 그동안 발생하지 않았던 서버 문제등을 발견할 수 있다는 것이다. Unsupervised Learning은 이 경우에 일반적인 형태의 데이터를 이용해 훈련하고, 기존에 있던 것과 다른 것이 입력으로 들어왔을 때 그것을 감지하는 형태로 Anomaly Detection 문제를 해결할 수 있다.



K-means Clustering

클러스터링 방법은 Unsupervised Learning 중에서 가장 잘 알려진 방법 중 하나이다. 클러스터링은 주어진 데이터를 이용하여 가장 자연스러운 그룹을 찾아내는 것을 시도하는 알고리즘이다. K-means Clustering은 이러한 클러스터링 알고리즘 중에 가장 널리 알려진 알고리즘이다. 데이터셋에서 \(k\)개의 클러스터를 찾는데 이 때 \(k\)는 데이터를 분석하는 사람으로부터 주어진 것이다. 또한 이 \(k\)는 Hyperparameter이며, 각각의 데이터마다 최적 값이 다르다. 실제로 이 챕터에서 적절한 \(k\)값을 찾는 과정이 중요한 부분이 될 것이다.

사용자들의 행동에 대한 데이터 혹은 거래 기록에 대한 데이터에서 비슷하다라는 것은 어떤 것인가? 이러한 질문에 대답하기 위하여 K-means Clustering 방식은 데이터 사이의 거리에 대한 정의를 필요로한다. 가장 간단한 방법중 하나는 Spark MLlib에도 구현되어 있는 Eculidean Distance를 이용하는 것이다. 이것을 이용하면 비슷한데이터는 거리가 가깝게 나올 것이다.

K-means Clustering에서의 각 클러스터는 하나의 데이터 포인트로 표현된다. 각 클러스터에 속하는 데이터 포인트들의 평균값이 그 클러스터의 중심이 되며, 각 클러스터의 중심을 평균으로 구하기 때문에 K-means라는 이름이 붙었다. 이 때의 가정은 각 Feature의 값들이 숫자 형태의 값임을 가정으로 하며, 각 클러스터의 중심은 centroid라고 부른다. K-means Clustering의 동작 과정은 매우 간단하다. 제일 먼저 알고리즘은 \(k\)개의 데이터를 선택함으로써 각 클러스터의 centroid를 초기화한다. 그리고 각 데이터는 가장 가까운 centroid의 클러스터로 할당된다. 그리고 각각의 클러스터별로 새로운 데이터의 평균을 구해 새로운 centorid로 지정한다. 이 과정이 반복된다.



Network Intrusion

사이버 공격이라는 형태의 해킹이 뉴스에서도 심심치 않게 등장하고있다. 몇몇 공격들은 네트워크 트래픽을 점령하여 정상적인 트래픽을 밀어내기도 한다. 하지만 네트워킹 소프트웨어의 결점 등을 이용해 컴퓨터에 대한 비정상적인 권한을 탈취하는 공격도 존재한다. 이 때 공격받는 컴퓨터에서는 공격받는지 알아채기가 매우 어렵다. 이러한 공격(Exploit)을 찾아내기 위해선 매우 많은 네트워크 요청들 사이에 비정상적인 공격을 찾아내야 하기 때문이다.

몇몇 공격들은 특정 알려진 패턴을 따른다. 예를 들면 가능한 모든 포트를 빠른 시간안에 접근하는데, 이것은 일반 소프트웨어들은 일반적으로 하지 않는 패턴이다. 하지만 이것은 일반적으로 Expolit을 하기 위한 컴퓨터를 찾는 공격자들이 일반적으로 제일 첫단계로 수행하는 과정이다.

만약 짧은 시간 내애 몇개의 포트에 대해 접속 시도가 발생한다면 몇 개의 접속 시도는 일반적인 것으로 간주 될 수 있지만 대부분의 시도들은 비정상 적인 것일 것이므로 우리는 이것을 port-scanning 공격이 들어온 것으로 판단할 수 있다. 이러한 이미 알려진 형태의 공격들은 미리 알고 감지할 수 있다. 하지만 알려져 있지 않은 형태의 공격이 들어온다면 어떻게 해야될까? 가장 큰 문제점은 어떤 형태의 공격일지 모른다는 것이다. 그동안과의 다른 형태의 접근 시도를 잠재적인 공격 트래픽으로 간주하여 감시해야 할 것이다.

여기에서 Unsupervised Learning이 사용될 수 있다. K-means Clustering과 같은 방법을 이용하면 비정상적인 네트워크 연결을 탐지해 낼 수 있다. K-means Clustering을 이용하여 네트워크 연결을 클러스터링 하고 기존의 정상적인 네트워크 연결의 클러스터와는 다른 연결이 요청되었을 때, 이것을 비정상적인 연결이라고 판단할 수 있다.



KDD Cup 1999 Data Set

KDD Cup은 ACM에서 매년 열리는 데이터 마이닝 대회이다. 각 해마다 머신 러닝 문제가 주어지고, 그것을 얼마만큼의 정확도로 해결하는가에 대한 대회이다. 1999년에 열렸던 대회는 네트워크 침입에 대한 대회였는데 데이터셋은 지금도 접근 가능하다. 이 포스트에서는 이 데이터를 Spark을 통해 분석하여 비정상적인 접근을 찾아내도록 할 것이다.

다행히도 대회 개최자들이 이미 Raw 네트워크 데이터를 전처리하여 요약한 데이터를 제공하며 약 743MB의 490만개의 네트워크 연결에 대한 데이터이다. 데이터가 엄청 많은 것은 아니지만 이 챕터에서 수행하고자 하는 것들을 수행해보기에는 적절하다. 데이터 셋은 이 링크에서 다운로드 할 수 있다.

데이터의 각 행은 전달된 바이트 수, 로그인 시도, TCP 에러 등과 같은 정보를 포함한다. 각 데이터는 CSV 형태로 존재하며 제일 마지막 레이블을 제외한 41개의 Feature로 구성되어 있다.

0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.

위와 같은 형태로 존재하는데, TCP 연결, HTTP 서비스, 215바이트 송신, 45076바이트 수신과 같은 정보를 나타내며 각 Feature의 의미는 다음과 같다.

데이터를 살펴보면 많은 값이 15번째 컬럼과 같이 0 혹은 1임을 알 수 있다. 이것은 해당 값이 있는지 없는지 여부를 나타내며 이전 포트스에서처럼 특정 값을 비트로 표현한 것이 아니라 각각의 Feature가 특정 의미를 갖고 있는 것이다. 그리고 dst_host_srv_rerror_rate 이후의 컬럼은 0.0에서 1.0까지의 값을 갖는다.

가장 마지막 필드에는 해당 네트워크 연결의 레이블이 주어져있다. 대부분의 레이블이 normal이지만 몇몇 값들은 다음과 같이 특정 공격에 대한 타입이 기입되어 있다.

0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.

이러한 정보들은 훈련 과정에서 유용하게 사용될 수 있지만, 이 장에서 하고자 하는 것은 비정상적인 접근 트래픽을 감지하는 것이기 때문에 잠재적으로 새롭고 알려지지 않은 접근을 찾아낼 것이다. 따라서 이 레이블 정보는 거의 제외된 상태로 사용될 것이다.



A First Take on Clustering

kddcup.data.gz파일의 압축을 해제하고, kddcup.data.corrected 파일을 HDFS로 업로드한다. 이 포스트에서는 /kdd/kddcup.data.corrected경로에 업로드 한 것을 가정한다.

Spark-shell에서 다음과 같이 파일을 로드하고, 각 레이블별로 어느정도의 양이 있는지 확인함으로써 간단히 데이터를 확인한다.

val rawData = sc.textFile("/kdd/kddcup.data.corrected")
rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)

다음과 같이 23개의 레이블이 존재하며, 가장 많은 공격 형태는 smurf 이고, neptune 순서이다.

(smurf.,2807886)
(neptune.,1072017)
(normal.,972781)
...

K-means Clustering을 수행하기 전에 주의해야 할 것이 있다. KDD 데이터셋은 숫자 형태의 데이터가 아닌 Feature(nonnumeric feature)가 있다. 예를 들면 두 번째 열과 같은 경우에는 그 값이 tcp, udp, icmp와 같은 값들이다. 하지만 K-means Clustering에서는 숫자 형태의 Feature만 사용할 수 있다. 처음에는 이 값을들 무시하고 진행을 할 것이다. 그리고 뒷 부분에서 이 Categorical Feature를 포함하여 클러스터링을 수행할 것이다.

다음의 코드는 데이터를 파싱하여 K-means Clustering에 필요한 데이터로 필터링 하는 과정이다. 이 과정에서 제일 마지막의 레이블을 포함한 Categorical Feature를 제외한다. 그리고 이 데이터를 이용하여 K-means Clustering을 수행한다.

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.clustering._

val labelsAndData = rawData.map { line =>
    val buffer = line.split(',').toBuffer
    buffer.remove(1, 3)
    val label = buffer.remove(buffer.length - 1)
    val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
    (label, vector)
}

val data = labelsAndData.values.cache()

val kmeans = new KMeans()
val model = kmeans.run(data)

model.clusterCenters.foreach(println)

위 코드의 수행 결과로 다음과 같이 두 개의 클러스터의 중심 벡터가 출력될 것이다. 그것은 K-means Clustering을 통해 \(k=2\)로 데이터가 맞춰졌다는 것이다.

[48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,3.205108575604837E-5,0.14352904910348827,0.00808830584493399,6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,295.26714620807076,0.17797031701994342,0.1780369894027253,0.05766489875327374,0.05772990937912739,0.7898841322630883,0.021179610609908736,0.02826081009629284,232.98107822302248,189.21428335201279,0.7537133898006421,0.030710978823798966,0.6050519309248854,0.006464107887636004,0.1780911843182601,0.17788589813474293,0.05792761150001131,0.05765922142400886]

[10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0]

하지만 앞서 데이터를 살펴보았던 것과 마찬가지로 총 23개의 레이블이 존재하는데, 단순히 두 개의 클러스터로 구분하는 것은 적절치 않다는 것을 직관적으로 알 수 있다. 때문에 이미 알고 있는 레이블 정보를 이용하여 클러스터링 결과가 실제 데이터와 어떤 차이를 보이는지 확인 해 보는것은 많은 도움이 된다. 다음 코드를 이용하면 각 클러스터별로 어떤 형태의 공격에 대한 레이블이 포함되었는지 확인할 수 있다.

val clusterLabelCount = labelsAndData.map { case (label, datum) =>
    val cluster = model.predict(datum)
    (cluster, label)
}.countByValue

clusterLabelCount.toSeq.sorted.foreach {
    case ((cluster, label), count) =>
    println(f"$cluster%1s$label%18s$count%8s")
}

그 결과는 다음과 같다.

0             back.    2203
0  buffer_overflow.      30
0        ftp_write.       8
0     guess_passwd.      53
0             imap.      12
0          ipsweep.   12481
0             land.      21
0       loadmodule.       9
0         multihop.       7
0          neptune. 1072017
0             nmap.    2316
0           normal.  972781
0             perl.       3
0              phf.       4
0              pod.     264
0        portsweep.   10412
0          rootkit.      10
0            satan.   15892
0            smurf. 2807886
0              spy.       2
0         teardrop.     979
0      warezclient.    1020
0      warezmaster.      20
1        portsweep.       1

위 결과는 portsweep 공격에 대한 데이터를 제외한 모든 데이터들이 0번 클러스터에 할당되어 있음을 알 수 있다.



Choosing K

앞선 결과를 통해 단순히 두개의 클러스터는 부족하다는 것을 알 수 있다. 그렇다면 몇 개의 클러스터가 이 데이터셋에 적절할것인가? 이 데이터에 존재하는 레이블의 종류가 23개 이므로 \(k\)는 최소 23과 비슷하거나 큰 것이 결과가 좋을 것이다. 일반적으로 \(k\)를 결정할 때는 많은 값들이 고려된다. 근데, 어떤 것이 과연 "좋은" \(k\)일까?

클러스터링이 잘 되었다는 것은 각 데이터와 그 데이터가 속하는 클러스터의 중심과 거리가 가까운 것을 이야기한다. 그래서 그것을 계산하기 위해 Eculidean distance를 계산하는 함수를 정의하고, 그 함수를 이용해 각 데이터가 가장 가까운 클러스터의 중심과 얼마만큼 떨어져 있는지 계산할 것이다.

def distance(a: Vector, b: Vector) = {
    math.sqrt(a.toArray.zip(b.toArray).map(p => math.pow(p._1 - p._2, 2)).sum)
}

def distToCentroid(datum: Vector, model: KMeansModel) = {
    val cluster = model.predict(datum)
    val centroid = model.clusterCenters(cluster)
    distance(centroid, datum)
}

이 함수를 이용하여 클러스터의 갯수 \(k\)가 주어졌을 때, 각 데이터와 가장 가까운 클러스터 사이의 거리에 대한 평균값을 계산할 수 있다.

import org.apache.spark.rdd._

def clusteringScore(data: RDD[Vector], k: Int) = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    val model = kmeans.run(data)
    data.map(datum => distToCentroid(datum, model)).mean()
}

(5 to 40 by 5).map(k => (k, clusteringScore(data, k))).foreach(println)

다음과 같은 결과를 볼 수 있는데, 클러스터의 수가 늘어날수록 각 데이터와 클러스터의 중심점과의 거리 평균이 줄어드는 경향을 보인다는 것을 알 수 있다.

(5,1938.8583418059188)
(10,1661.2533261157496)
(15,1405.536523064836)
(20,1111.7423030526104)
(25,946.2578661660172)
(30,597.6141598314152)
(35,748.4808532423143)
(40,513.382773134806)

하지만, 이러한 결과는 너무 뻔한 것이다. 많은 클러스터 중심을 이용 할 수록 데이터들이 각 클러스터의 중심과 가까울 것은 명백하고, 만약 데이터의 수와 클러스터의 수를 동일하게 설정한다면 각 데이터가 하나의 클러스터가 될 것이므로 각 데이터와 클러스터의 중심 사이의 거리는 0일 것이다. 또한 이상하게도 클러스터 개수가 35개일 때의 거리의 평균이 클러스터 개수가 30개일 때보다도 높다. 이것은 반드시 \(k\)가 높을 때에도 적은 \(k\)로 좋은 클러스터링을 수행하는 것을 허용하기 때문이다. 이것은 K-means Clustering은 꼭 주어진 \(k\)만을 이용해서 클러스터링을 수행해야만 하는 것은 아님을 이야기한다. 이것은 반복 과정에서 좋긴 하지만 최적은 아닌 local minimum 등에서 클러스터링이 멈출 수 있음을 의미한다. 이러한 것은 좀 더 지능적인 K-means Clustering을 이용하여 좋은 initial centroid(초기 중심점)을 선택할 때에도 마찬가지로 존재하는 문제이다. 예를 들면 K-means++, K-means|| 과 같은 방법들은 여러 선택 알고리즘을 이용하여 다양하고, 분산된 중심점들을 선택하여 가장 기초적인 K-means Clustering 방식보다 좋은 결과를 이끌어낸다. 하지만 이러한 방법들도 마찬가지로 무작위(Random)적인 선택을 기반으로 하기 때문에 최적의 클러스터링을 보장하지는 않는다.

이 결과를 반복 횟수를 증가시킴으로써 성능을 증가시킬수 있다. setRuns()함수는 하나의 \(k\)마다 클러스터링을 수행하는 횟수를 설정하는 것이며, setEpsilon()함수는 각 반복마다 확인하는 클러스터의 중심의 이동한 차이에 대한 Threshold이다. Epsilon을 조절함으로써 각 클러스터링의 수행마다 얼마만큼의 반복이 수행될 지를 결정하는데 영향을 미친다. Epsilon을 큰 값으로 설정하면 클러스터 중심점의 변경에 민감하지 않을 것이고 작은 값으로 설정한다면 작은 클러스터 중심점의 변경에도 민감하게 반응할 것이다.(민감하다는 것은 적은 차이에도 새로운 반복을 수행한다는 것)

다음과 같이 해당 부분들을 변경시켜 수행할 수 있다.

kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6) //Epsilon의 기본값은 1.0e-4

(30 to 100 by 10).par.map(k => (k, clusteringScore(data, k))).toList.foreach(println)

위 코드의 수행 결과로 이제는 평가 수치가 점점 감소하는 결과를 보인다.

(30,654.6479150668039)
(40,833.771092804366)
(50,408.29802592345465)
(60,298.43382866843206)
(70,256.6636429518882)
(80,163.15198293023954)
(90,135.78737972772348)
(100,118.73064012152163)

이 결과를 이용하여 최적의 \(k\)를 선택해야 하는데, Elbow를 선택하는 방법을 이용하면 \(k=50, k=80\) 혹은 가장 작은 \(k=100\) 등의 값이 적절해보인다.



Visualization in R

이 시점에서 데이터들이 어떤 형태로 구성되어 있는가 확인해보는 것은 많은 도움이 된다. 하지만 Spark에는 자체적인 시각화 도구를 갖고 있지 않기 때문에 다른 도구의 도움을 받아야 한다. 하지만 Spark에서 수행하는 결과들은 쉽게 HDFS로 저장될 수 있고, 그것은 쉽게 다른 통계 툴에서 사용할 수 있다. 이 챕터에서는 R을 이용하여 데이터를 시각화 해 볼 것이다.

R은 2차원 혹은 3차원의 데이터를 시각화 하는 반면 우리가 지금까지 사용해 온 데이터는 38차원이다. 때문에 이 데이터를 최대 3차원을 갖는 데이터로 줄여야(Project)할 필요성이 있다. 게다가 R에서는 많은 데이터를 처리하기에는 무리가 있다. 따라서 데이터의 수 역시 샘플링을 통해 줄여야 한다.

시작에 앞서 \(k=100\)을 이용해 모델을 생성하고, 각 데이터가 속하는 클러스터의 번호를 각 데이터에 매핑한다. 그리고 이것을 CSV 형태로 HDFS에 출력한다.

val kmeans = new KMeans()
kmeans.setK(100)
kmeans.setRuns(5)
kmeans.setEpsilon(1.0e-6)

val model = kmeans.run(data)

val sample = data.map(datum =>
    model.predict(datum) + "," + datum.toArray.mkString(",")
).sample(false, 0.05)

sample.saveAsTextFile("/kdd/sample")

sample 함수는 전체 데이터에서 원하는 만큼의 데이터를 샘플링 할 수 있도록 해 준다. 이 예시에서는 전체 데이터 중 5%를 반복 없이 샘플링한다.

다음의 R 코드는 HDFS에서 CSV파일을 읽어서 활용하는 코드이다. 이 코드에서는 rgl이라는 패키지를 이용하여 데이터를 그래프에 나타낸다. 이 과정에서 38차원의 데이터 중 무작위로 세 개의 Unit Vector를 선택하여 그 값을 이용해 3차원 벡터를 생성하고, 그것을 그래프에 그린다. 이 과정은 간단한 과정의 Dimension Reduction(차원 감소)이다. 물론 이 과정보다 훨씬 정한 알고리즘들(ex: PCA, SVD)이 존재하고, 이것을 Spark에서나 R에서 수행이 가능하지만 이것을 추가로 수행하는데에 추가적인 시간이 소요되고, 이 챕터의 내용을 벗어나는 것이어서 사용하지는 않는다.

(다음 코드를 수행하기 위해서는 rgl패키지를 설치할 수 있는 환경이 구성되어야 한다.)

install.packages("rgl") #처음 한번만 수행하면 된다.
library(rgl)

#HDFS로부터 CSV형태의 데이터 읽기
clusters_data <- read.csv(pipe("hdfs dfs -cat /kdd/sample/*"))
clusters <- clusters_data[1]
data <- data.matrix(clusters_data[-c(1)])
rm(clusters_data)

#Random Unit Vector 생성
random_projection <- matrix(data = rnorm(3*ncol(data)), ncol = 3)
random_projection_norm <-
    random_projection / sqrt(rowSums(random_projection * random_projection))

projected_data <- data.frame(data %*% random_projection_norm)

num_clusters <- nrow(unique(clusters))
palette <- rainbow(num_clusters)
colors = sapply(clusters, function(c) palette[c])
plot3d(projected_data, col = colors, size = 10)

위 코드의 수행 결과로는 다음과 같은 형태의 3D 그래프가 출력된다.

Random 3D Projection

이 결과는 각 클러스터의 번호가 색깔로 구분된 데이터들의 좌표(위 그림에서는 하나의 클러스터가 대부분을 차지하는 것으로 나오지만 확대하면 다양한 클러스터의 색깔을 볼 수 있다.)로 표현된 것이다. 데이터를 어떻게 분석해야 할지 막막하지만, 데이터의 분포가 "L"자 형태인 것만은 분명하다. 그리고 그 L의 한 쪽은 길고, 한쪽은 짧다.

이것은 Feature중에 그 값의 범위가 다른 Feature에 비해 큰 것들이 존재한다는 것이다. 예를 들면 대부분의 Feature의 값은 0과 1 사이인 것에 비해 주고받은 Byte의 수에 대한 Feature는 그 범위가 매우 크다. 때문에 각 클러스터로부터의 거리를 계산하여 클러스터링의 성능을 평가할 때 주고받은 Byte 수에 대한 Feature가 영향령이 크다는 것이 된다. 따라서 앞서 계산한 Euclidean Distance에서는 다른 Feature들이 거의 무시되고 주고받은 Byte 수에 대한 것이 대부분이라는 것이다. 이것을 방지하기 위해 각 Feature 값을 Normalize 할 필요가 있다.



Feature Normalization

우리는 다음과 같은 식을 이용하여 각 Feature를 Normalize할 수 있다.

\( { normalized }_{ i }=\frac { { feature }_{ i }-{ \mu }_{ i } }{ { \sigma }_{ i } } \)

위 식은 각 Feature의 값에서 평균을 뺀 후에, 그것을 표준편차로 나누어 정규화 시키는 것이다. 사실 평균을 각 데이터에서 빼는 것은 클러스터링 과정에 아무 영향을 미치지 않는다. 모든 값을 같은 양만큼 같은 뱡향으로 움직이는 것이기 때문이다. 그렇지만, 일반적으로 사용하는 정규화 식을 그대로 이용하기 위하여 평균을 빼는 과정을 제외하지는 않았다.

표준값은 각 Feature별로 갯수, 합, 제곱합을 계산함으로써 구할 수 있다. 다음과 같은 코드를 이용해 각 Feature의 값을 Normalization할 수 있다.

val dataAsArray = data.map(_.toArray)
val numCols = dataAsArray.first().length
val n = dataAsArray.count()
val sums = dataAsArray.reduce {
    (a, b) => a.zip(b).map(t => t._1 + t._2)
}

val sumSquares = dataAsArray.aggregate(
    new Array[Double](numCols)
    )(
        (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2),
        (a, b) => a.zip(b).map(t => t._1 + t._2)
    )   

val stdevs = sumSquares.zip(sums).map {
    case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
}

val means = sums.map(_ / n)

위 코드에서 aggregate 함수에 대해 어려움을 느낄 수 있는데, Python API를 이용해 설명하고 있지만, 이 블로그에서 잘 설명을 하고 있다. 간단히 얘기하면 aggregate 함수의 파라미터로는 초기값(0 Value)이 사용되고, 첫번째 람다함수는 결과 Object를 생성하는 과정에서 어떻게 그 결과를 만들어 낼 것인지(위 코드의 예시에서는 제곱의 합을 구하는 과정)에 대한 것이고, 두번째 람다함수는 여러개의 결과 Object를 Combine할 때 어떻게 할 것인지에 대한 함수이다. 결론적으로 위 과정에서 제곱의 합을 구하는 과정은 각 Partition에서는 zip 된 두 개의 데이터 중 Feature별로 다음 것을 제곱하여 이전의 값에 더해나가고, 그 Partition을 합할 때는 덧셈을 수행함으로써 모든 데이터의 Feature별 제곱의 합을 구한다.

앞서 구한 값들을 이용해 데이터를 정규화하고, 그 데이터를 이용해 K-means 클러스터링을 수행시킨 뒤 다시 Euclidean Distance를 계산한다.

def normalize(datum: Vector) = {
    val normalizedArray = (datum.toArray, means, stdevs).zipped.map {
    (value, mean, stdev) =>
        if (stdev <= 0) (value - mean) else (value - mean) / stdev
    }
    Vectors.dense(normalizedArray)
}

val normalizedData = data.map(normalize).cache()

(60 to 120 by 10).par.map {
    k => (k, clusteringScore(normalizedData, k))
}.toList.foreach(println)

위 코드의 수행 결과는 다음과 같다.

(60,0.35788542308188337)
(70,0.33739539223932735)
(80,0.35734816618118775)
(90,0.3333896914476046)
(100,0.2866857023435722)
(110,0.27457555082113716)
(120,0.25987997589819595)

위 결과를 통해 elbow인 \(k=100\)은 괜찮은 선택이었음을 알 수 있다. 앞서 정규화한 데이터를 다시 R을 통해 시각화하여 어떤 차이가 있는지 확인해 볼 수 있다. 다음 코드를 이용해 Normalize된 데이터를 이용하여 K-means Clustering을 수행시키고, 그것을 HDFS에 저장한다. (확실한 결과를 위해 setRun()을 통해 옵션을 정하였다.)

val kmeans = new KMeans()
kmeans.setK(100)
kmeans.setRuns(5)
kmeans.setEpsilon(1.0e-6)

val model = kmeans.run(normalizedData)

val sample = normalizedData.map(datum =>
    model.predict(datum) + "," + datum.toArray.mkString(",")
).sample(false, 0.05)

sample.saveAsTextFile("/kdd/normalized_sample")

그리고 R에서 다시 그래프를 생성하였다.

library(rgl)

#HDFS로부터 CSV형태의 데이터 읽기
clusters_data <- read.csv(pipe("hdfs dfs -cat /kdd/normalized_sample/*"))
clusters <- clusters_data[1]
data <- data.matrix(clusters_data[-c(1)])
rm(clusters_data)

#Random Unit Vector 생성
random_projection <- matrix(data = rnorm(3*ncol(data)), ncol = 3)
random_projection_norm <-
    random_projection / sqrt(rowSums(random_projection * random_projection))

projected_data <- data.frame(data %*% random_projection_norm)

num_clusters <- nrow(unique(clusters))
palette <- rainbow(num_clusters)
colors = sapply(clusters, function(c) palette[c])
plot3d(projected_data, col = colors, size = 10)

생성된 그래프는 다음과 같으며, 정규화 이전의 데이터에 비해 좀 더 풍부한(richer) 구조를 띄고 있음을 알 수 있다.

Random 3D Projection of Normalized Data



Using Labels with Entropy

지금까지 우리는 클러스터링의 성능을 판단하기 위해, 그리고 그것을 이용해 적절한 \(k\)를 찾는 과정에서 매우 간단한 것을 이용하였다. 여기에 지난 Chapter에서 사용한 방식을 적용할 수 있다. Gini inputiryEntropy방식이 그것인데, 여기서는 Entropy 방식을 이용하여 설명할 것이다.

좋은 클러스터링이라는 것은 각각의 클러스터가 하나의 label에 대한 데이터만 가지고 있고, 결과적으로 낮은 Entropy를 갖는 것을 의미한다. 때문에 여기에 가중평균을 적용하여 클러스터링의 스코어를 계산할 수 있다.*

val entropy(counts: Iterable[Int]) = {
    val values = counts.filter(_ > 0)
    val n: Double = values.sum
    values.map { v =>
        val p = v / n
        -p * math.log(p)
    }.sum
}

def clusteringScore(normLabelAndData: RDD[(String, Vector)], k:Int) = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    kmeans.setRuns(5)
    kmeans.setEpsilon(1.0e-6)

    val model = kmeans.run(normLabelAndData.values)

    val labelsAndClusters = normLabelAndData.mapValues(model.predict)
    val clustersAndLabels = labelsAndClusters.map(_.swap)
    val labelsInCluster = clustersAndLabels.groupByKey().values
    val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size))
    val n = normLabelAndData.count()

    labelCounts.map(m => m.sum * entropy(m)).sum / n
}

val normalizedLabelsAndData = labelsAndData.mapValues(normalize).cache()

(80 to 160 by 10).par.map {
    k => (k, clusteringScore(normalizedLabelsAndData, k))
}.toList.foreach(println)

normalizedLabelsAndData.unpersist()


위 코드의 수행 결과는 다음과 같으며, \(k=150\)일때 가장 좋은 성능을 나타냄을 알 수 있다.**

(80,0.01633056783505308)
(90,0.014086821003939093)
(100,0.013287591809429072)
(110,0.011314751300005676)
(120,0.012290307370115603)
(130,0.00949651182021723)
(140,0.008943810114864363)
(150,0.007499306029722229)
(160,0.008704684195176402)
(170,0.008691369298104417)
(180,0.008559061207118177)

Categorical Variables

지금까지 수행한 클러스터링은 세 개의 Categorical Feature를 제외하고 수행하였다. 왜냐면, MLlib에서의 K-means Clustering은 숫자 형태의 데이터가 아닌 것에 대해서는 클러스터링을 수행할 수 없기 때문이다. Categorical Feature를 클러스터링에 포함시키기 위하여 지난 챕터에서 사용한 데이터베이스가 활용하고 있는 방법인, Feature의 값의 범위를 하나의 비트로 표현하는 방법을 이용할 것이다.

예를 들면 두 번째 Feature의 경우에는 어떤 형태의 프로토콜이 사용되었는지에 대한 값이다. 이것은 tcp, udp, icmp의 값을 갖는데, 이 것을 바이너리의 값을 갖는 세 개의 Feature, 예를들어 istcp, isudp, is_icmp와 같은 Feature로 나누어 저장하는 것이다. 그리고 어떤 데이터가 원래 데이터에서 udp 프로토콜을 사용한 데이터였다면, 0.0, 1.0, 0.0으로 표현하는 것이다.

이 방법을 데이터에 적용하면 다시 Normalize와, 클러스터링을 수행시켜야 한다. 책에는 이 과정에 대한 코드가 있지 않지만, Github Repository에 해당 코드가 있어 사용하였다.

import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd._

def buildCategoricalAndLabelFunction(rawData: RDD[String]): (String => (String,Vector)) = {
  val splitData = rawData.map(_.split(','))
  val protocols = splitData.map(_(1)).distinct().collect().zipWithIndex.toMap
  val services = splitData.map(_(2)).distinct().collect().zipWithIndex.toMap
  val tcpStates = splitData.map(_(3)).distinct().collect().zipWithIndex.toMap
  (line: String) => {
    val buffer = line.split(',').toBuffer
    val protocol = buffer.remove(1)
    val service = buffer.remove(1)
    val tcpState = buffer.remove(1)
    val label = buffer.remove(buffer.length - 1)
    val vector = buffer.map(_.toDouble)

    val newProtocolFeatures = new Array[Double](protocols.size)
    newProtocolFeatures(protocols(protocol)) = 1.0
    val newServiceFeatures = new Array[Double](services.size)
    newServiceFeatures(services(service)) = 1.0
    val newTcpStateFeatures = new Array[Double](tcpStates.size)
    newTcpStateFeatures(tcpStates(tcpState)) = 1.0

    vector.insertAll(1, newTcpStateFeatures)
    vector.insertAll(1, newServiceFeatures)
    vector.insertAll(1, newProtocolFeatures)

    (label, Vectors.dense(vector.toArray))
  }
}

def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = {
  val dataAsArray = data.map(_.toArray)
  val numCols = dataAsArray.first().length
  val n = dataAsArray.count()
  val sums = dataAsArray.reduce(
    (a, b) => a.zip(b).map(t => t._1 + t._2))
  val sumSquares = dataAsArray.aggregate(
    new Array[Double](numCols)
    )(
    (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2),
    (a, b) => a.zip(b).map(t => t._1 + t._2)
    )
    val stdevs = sumSquares.zip(sums).map {
      case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
    }
    val means = sums.map(_ / n)

    (datum: Vector) => {
      val normalizedArray = (datum.toArray, means, stdevs).zipped.map(
        (value, mean, stdev) =>
        if (stdev <= 0)  (value - mean) else  (value - mean) / stdev
        )
      Vectors.dense(normalizedArray)
    }
}

def entropy(counts: Iterable[Int]) = {
  val values = counts.filter(_ > 0)
  val n: Double = values.sum
  values.map { v =>
    val p = v / n
    -p * math.log(p)
    }.sum
}

def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = {
  val dataAsArray = data.map(_.toArray)
  val numCols = dataAsArray.first().length
  val n = dataAsArray.count()
  val sums = dataAsArray.reduce(
    (a, b) => a.zip(b).map(t => t._1 + t._2))
  val sumSquares = dataAsArray.aggregate(
    new Array[Double](numCols)
    )(
      (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2),
      (a, b) => a.zip(b).map(t => t._1 + t._2)
    )
  val stdevs = sumSquares.zip(sums).map {
    case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
  }
  val means = sums.map(_ / n)

  (datum: Vector) => {
    val normalizedArray = (datum.toArray, means, stdevs).zipped.map(
      (value, mean, stdev) =>
      if (stdev <= 0)  (value - mean) else  (value - mean) / stdev
      )
    Vectors.dense(normalizedArray)
  }
}

def clusteringScore(normalizedLabelsAndData: RDD[(String,Vector)], k: Int) = {
  val kmeans = new KMeans()
  kmeans.setK(k)
  kmeans.setRuns(10)
  kmeans.setEpsilon(1.0e-6)

  val model = kmeans.run(normalizedLabelsAndData.values)
  val labelsAndClusters = normalizedLabelsAndData.mapValues(model.predict)
  val clustersAndLabels = labelsAndClusters.map(_.swap)
  val labelsInCluster = clustersAndLabels.groupByKey().values
  val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size))
  val n = normalizedLabelsAndData.count()

  labelCounts.map(m => m.sum * entropy(m)).sum / n
}

val rawData = sc.textFile("/kdd/kddcup.data.corrected")

val parseFunction = buildCategoricalAndLabelFunction(rawData)
val labelsAndData = rawData.map(parseFunction)
val normalizedLabelsAndData =
    labelsAndData.mapValues(buildNormalizationFunction(labelsAndData.values)).cache()

val result = (80 to 160 by 10).map(k =>
  (k, clusteringScore(normalizedLabelsAndData, k))).toList
result.foreach(println)

normalizedLabelsAndData.unpersist()

위 코드의 수행 결과는 다음과 같다. ***

(80,0.02779435579758084)
(90,0.05653844879893403)
(100,0.029429111090986747)
(110,0.022128545398091923)
(120,0.022724916386673424)
(130,0.02103069110661259)
(140,0.01920591910565662)
(150,0.019929832533142584)
(160,0.019637563306766435)

이로부터 \(k=140\)일때가 최적의 \(k\)임을 알 수 있다.



Clustering in Action

이제, 남은 과정은 네트워크 트래픽 중에서 비정상적인 트래픽을 감지해 내는 것이다.

이 장의 처음 부분에서 했던 것과 마찬가지로 현재 \(k=140\)으로 클러스터링을 진행하였을 때의 각 클러스터별 데이터 레이블의 상태를 확인해 볼 수 있다.

val rawData = sc.textFile("/kdd/kddcup.data.corrected")

val parseFunction = buildCategoricalAndLabelFunction(rawData)
val labelsAndData = rawData.map(parseFunction)
val normalizedLabelsAndData =
    labelsAndData.mapValues(buildNormalizationFunction(labelsAndData.values))
val normalizedData = normalizedLabelsAndData.values.cache()

val kmeans = new KMeans()
kmeans.setK(140)
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)

val model = kmeans.run(normalizedData)
val labelsAndClusters = normalizedLabelsAndData.mapValues(model.predict)
val clustersAndLabelsCount = labelsAndClusters.map(_.swap).countByValue

clustersAndLabelsCount.toSeq.sorted.foreach {
    case ((cluster, label), count) =>
    println(f"$cluster%1s$label%18s$count%8s")
}

normalizedData.unpersist()

위 코드를 이용하면 다음과 같은 결과를 볼 수 있다. 처음 확인하였던 결과와는 다르게, 각 클러스터별로 최소 하나 이상의 레이블 데이터가 있으며, 한 레이블이 높은 비율을 차지하고 있다는 것을 알 수 있다.

0          neptune.  362825
0        portsweep.       9
1           normal.      13
1        portsweep.     645
2          neptune.     200
2        portsweep.       6
2            satan.       1
3          neptune.    1037
3        portsweep.      13
3            satan.       3
4             back.      43
4           normal.   63268
...
138           normal.    1005
139  buffer_overflow.       6
139        ftp_write.       4
139          ipsweep.      13
139         multihop.       3
139           normal.   35974
139        portsweep.       1
139          rootkit.       1
139      warezclient.     701
139      warezmaster.      18

이런 결과를 갖는 클러스터링 모델에서 비정상적인 트래픽을 찾아낼 것인데, 그 과정은 다음과 같다.

트레이닝한 데이터별로 각 그 데이터가 속하는 클러스터의 중심점과의 거리를 계산하고(이 과정은 앞서 진행하였다.), 그 길이들은 내림차순으로 정렬하여 100번째의 거리값을 Threshold로 정한다. 그리고 전체 데이터중 중심점과의 Threshold를 초과하는 것들을 비정상적인 데이터로 간주할 것이다. 그러기 위해선 데이터를 다시 normalize해야 한다.(앞선 과정에서 진행하였지만, 앞서 진행한 코드를 그대로 적용할 수 없어서 다시 계산한다.) 이어서 Threshold를 계산한 다음에 Threshold를 이용하여 각 데이터를 필터링한다.

val distances = normalizedData.map(datum => distToCentroid(datum, model))
val threshold = distances.top(100).last
val originalAndParsed = rawData.map(line => (line, parseFunction(line)._2))

val dataAsArray = originalAndParsed.map(line => line._2.toArray)
val numCols = dataAsArray.first().length
val n = dataAsArray.count()
val sums = dataAsArray.reduce {
    (a, b) => a.zip(b).map(t => t._1 + t._2)
}

val sumSquares = dataAsArray.aggregate(
    new Array[Double](numCols)
    )(
        (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2),
        (a, b) => a.zip(b).map(t => t._1 + t._2)
    )   

val stdevs = sumSquares.zip(sums).map {
    case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
}

val means = sums.map(_ / n)
def normalize(datum: Vector) = {
    val normalizedArray = (datum.toArray, means, stdevs).zipped.map {
    (value, mean, stdev) =>
        if (stdev <= 0) (value - mean) else (value - mean) / stdev
    }
    Vectors.dense(normalizedArray)
}

val originalAndNormalized = originalAndParsed.mapValues(normalize)
val anormalies = originalAndNormalized.filter{
  case (original, normalized) => distToCentroid(normalized, model) > threshold
}.keys
anormalies.take(10).foreach(println)

재미있게도 다음 결과가 제일 처음으로 나왔는데, 레이블은 정상적인 트래픽이라는 레이블이다. 하지만 이 정상적이라는 것은 보안에 영향을 미치는지에 대한 여부를 뜻하기 때문에 일반적인 네트워크 트래픽과 비교해 봤을때는 정상 트래픽과는 거리가 있다. 연결은 성공하였지만 그 이후로 아무것도 데이터가 전송되지 않은 것을 의미하는 S1 플래그이며, 짧은 순간 동안 30번 가량의 연결이 존재하였다. 이것으로부터 이 데이터는 악의적인 연결은 아니지만 충분히 정상적이지 않은 연결에 대한 것임을 알 수 있다.

0,tcp,telnet,S1,145,13236,0,0,0,0,0,1,31,1,2,38,0,0,0,0,0,0,1,1,1.00,1.00,0.00,0.00,1.00,0.00,0.00,29,10,0.28,0.10,0.03,0.20,0.07,0.20,0.00,0.00,normal.




* 이 과정부터 맥북에서 수행시켰을 때 7시간 이상이 걸려, Amazon EMR을 이용하여 계산하였다.

*, ** 이 결과는 책의 결과와 많이 다르다. 이것 때문에 책의 저자와 이메일을 주고받았는데, 책의 Spark 버전과 이 글을 쓸 때의 Spark 버전과의 차이가 있어서 인것으로 잠정 결론지었다.

Read More...
Spark & 머신 러닝 - Predicting Forest Cover

이 포스트는 Decision Tree를 이용해서 미국 콜로라도의 한 지역을 덮고 있는 숲의 종류를 예측하는 과정에 대한 글이다.

이 포스트는 Advanced Analytics with Spark을 정리한 글이다.

Regression and Classification

Regression은 숫자 형태의 데이터를 이용하여 또 다른 값을 예측하는 것이고, Classification은 레이블이나 카테고리등을 예측하는 것을 이야기 한다. 두 가지의 공통점은 주어진 하나 이상의 값을 이용하여 새로운 값을 예측해 낸다는 것이고 예측을 수행하기 위한 데이터와, 어떤 값을 예측할 것인지에 대한 'known answers'가 필요하다. 그렇기 때문에 Regression과 Classification은 Machine learning 에서 supervised learning에 속한다.

두 가지 모두 predictive analytics 에서 가장 오래되고 연구가 많이 된 방법이다. 머신러닝 관련된 라이브러리에서 대부분의 분석 알고리즘은 regression 혹은 classification 방법이다. 예를 들면 Support Vector Machine, Logistic Regression, Naive Bayes Classifier, Neural Network, Deep Learning 등이 있다.

이 챕터(포스트)에서는 이러한 여러 알고리즘 중 가장 간단하면서도 확장성이 좋은 Decision Tree와, 그것의 확장형인 Random Decision Forest(Random Forest라고도 한다.)를 이용할 것이다.



Decision Trees and Forest

앞서 설명한 Decision Tree는 숫자 형태의 데이터 뿐만 아니라 categorical 한 Feature에 대해서도 사용 가능하다. 이 Decision Tree가 좀 더 일반화되어 좀 더 강력한 성능을 제공하는 것이 Random Decision Forest라고 불린다. 이 두 가지가 Spark의 MLlib에 DecisionTreeRandomForest로 구현되어 있으며 이것을 데이터 셋에 적용하여 테스트 할 것이다.

Decision Tree 알고리즘은 직관적으로 이해하기 비교적 쉽다는 장점이 있다. 실제로 우리는 Decision Tree에 내재되어 있는 방식과 똑같은 방식을 실제로 사용하고 있기도 하다. 예를 들면 아침에 커피와 우유를 섞기 전에 우유가 상했는지 예측을 할 때 우리는 일련의 과정을 거친다. 먼저, 유통기한이 지났는가? 지나지 았았다면 상하지 않았을 것이고 날짜가 3일이상 지났다면 우유가 상했을 것이라 예측한다. 그렇지 않다면 우유에 냄새를 맡고 냄새가 이상하면 상했고, 그렇지 않다면 상하지 않았다고 판단할 것이다.

Decision Tree Example

Decision Tree에는 위 다이어그램과 같은 일련의 Yes/No 로 구성된 선택들이 내재되어 있다. 각각의 질문은 예측 결과 혹은 또 다른 결정문제를 유도한다. 이 Decision Tree의 각각의 노드는 결정문제가 되고, 각 leaf는 최종 예측 결과가 된다.



Preparing the Data

이 장에서 사용할 데이터셋은 Covtype이라는 데이터셋으로 이 링크에서 다운로드 할 수 있다. 다운로드 후 압축을 해제하면 생성되는covtype.data 파일이 데이터파일이다.

데이터셋은 미국 콜로라도주의 특정 지역을 덮고 있는 숲의 종류에 대해 기록이 정리되어있다. 각 행은 해당 지역에 존재하는 여러 부분들을 여러 개의 Feature로 표현한다. 고도, 경사도, 토양의 종류 등등이 Feature에 해당된다. 이렇게 표현되는 지역들을 덮고 있는 숲은 Feature로부터 예측 가능하며 총 54 가지의 Feature가 존재한다. 그리고 약 580000개의 데이터가 존재한다. 비록 데이터가 크지 않아 빅 데이터라 할 수는 없지만 우리가 Decision Tree를 생성하고 그 결과를 확인해 보는 데에는 적당하다.

데이터는 다음과 같은 내용으로 구성되어 있다.

Covtype Dataset Description

고맙게도 이 데이터는 CSV형태로 되어있기 때문에 Spark MLlib을 이용하기 위해 데이터를 정제하는 과정이 거의 필요하지 않다. covtype.data파일을 HDFS에 업로드할 때, 이 포스트에서는 /covtype/covtype.data로 업로드 하는 것을 가정한다.

Spark의 Mllib은 Feature의 Vector를 LabeledPoint 객체로 추상화하여 사용한다. Vector 형태로 저장되어 있는 Feature 값과 구하고자 하는 target인 Label로 구성된 형태이다. LabeledPoint는 숫자 형태의 값만을 이용하는 것으로 제한되며 target은 Double 형태로 지정된다. 때문에 만약 Categorical한 Feature(숫자가 아닌 값들)를 사용하고자 할 때는 적절한 값으로 인코딩을 해 주어야 한다.

한 가지 인코딩 방법은 \(N\)개의 값은 갖는 Categorical한 Feature를 \(N\)개의 0과 1의 조합을 통해 서로 다른 값을 갖게하는 것이다. 예를 들어 날씨에 대한 Feature가 갖는 값의 종류는 cloudy, rainy, clear 일 때, cloudy는 1,0,0으로, rainy는 0,1,0과 같은 값은 갖는 형태로 인코딩을 할 수 있다. 이 포스트에서 사용하는 데이터 셋은 이 방법으로 구성되어있다. 다른 방법으로는 각각의 값에 특정 숫자(1, 2, ...)를 부여하는 방법이 있다. 이 챕터의 뒷 부분에서는 전자의 방식을 후자의 방식으로 바꿔서도 실험을 수행할 것이다.

HDFS에 업로드한 데이터를 MLlib에서 사용하기 위해 다음과 같이 데이터를 준비한다.

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression._

val rawData = sc.textFile("/covtype/covtype.data")

val data = rawData.map { line =>
    val values = line.split(",").map(_.toDouble)
    val featureVector = Vectors.dense(values.init)
    val label = values.last - 1
    LabeledPoint(label, featureVector)
}

데이터의 구성을 보면 알 수 있듯이 각 행의 마지막 값이 우리가 최종적으로 알아내고자 하는 숲의 종류임을 알 수 있다. 따라서 위 코드에서처럼 featrueVector를 생성할 때 init()을 이용해 가장 마지막 값은 제외한 벡터를 feature vector로 한다. 그리고 MLlib에서의 Decition Tree의 Label은 값이 0 부터 시작하므로 데이터의 가장 마지막 값은 1을 뺀다.

또한 다음 코드를 이용해 이전 포스트에서 수행하였던 것처럼 모든 데이터를 Decition Tree를 생성하는데 사용하지 않고, 80% 데이터를 트레이닝에 사용하고, 10%를 Cross-validation, 나머지 10%를 테스트용으로 사용할 것이다.

val Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))
trainData.cache()
cvData.cache()
testData.cache()



A First Decision Tree

다음 코드를 이용해 Decision Tree를 트레이닝시킨다.

import org.apache.spark.mllib.tree._

val model = DecisionTree.trainClassifier(trainData, 7, Map[Int, Int](), "gini", 4, 100)

ALS를 이용한 추천 모델 생성에서도 그러하였듯이, trainClassifier 함수를 이용해 트레이닝을 할 때에도 몇 가지 필수 Hyperparameter가 사용된다. 첫번째 파라미터는 어떤 데이터를 이용하여 Classifer를 훈련시킬 것인지에 대한 것이고, 두 번째 파라미터는 우리가 최종적으로 예측하고자 하는 값의 종류가 몇가지 인지를 나타낸다. Covtype Dataset에서 우리가 예측하고자 하는 숲의 종류는 7가지 이므로 7이 파라미터로 이용된다. 다음 Map인자는 categorical feature에 대한 정보를 갖고 있는데, 이것은 나중에 gini파라미터의 의미와 함께 설명할 것이다. 그리고 다음 인자는 4로 Tree의 최대 깊이를 의미하며 마지막 100 파라미터는 Feature가 연속한 값을 가질 때 최대 몇 개의 bin 까지 나눌 수 있는가에 대한 것이다.

생성한 Decision Tree 모델이 얼마만큼의 정확도를 갖는지는 다음과 같은 코드를 이용해 Confusion Matrix를 계산함으로써 알 수 있다. 이 계산에서는 Cross-Validation 데이터를 이용하였다.

import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd._

def getMetrics(model: DecisionTreeModel, data: RDD[LabeledPoint]): MulticlassMetrics = {
    val predictionsAndLabels = data.map( example =>
        (model.predict(example.features), example.label)
    )
    new MulticlassMetrics(predictionsAndLabels)
}

val matrics = getMetrics(model, cvData)
matrics.confusionMatrix

Confusion Matrix는 다음과 같다. (랜덤성을 갖고 있기 때문에 수행 결과가 다를 수 있다.)

15767.0  5223.0   3.0     0.0  0.0   0.0  380.0
6761.0   21073.0  447.0   0.0  8.0   0.0  31.0
0.0      714.0    2836.0  0.0  0.0   0.0  0.0
0.0      0.0      292.0   0.0  0.0   0.0  0.0
16.0     865.0    24.0    0.0  16.0  0.0  0.0
0.0      428.0    1287.0  0.0  0.0   0.0  0.0
1134.0   20.0     0.0     0.0  0.0   0.0  914.0

우리가 예측하고자 하는 목표 값이 7가지 값을 갖기 때문에 Confusion Matrix는 7 X 7 의 행렬을 갖는다. 또한 위 행렬의 각 행은 실제 정답을 의미하고, 각 열은 Decision Tree가 예측한 값을 의미한다. 따라서 행 \(i\), 열 \(j\)의 숫자는 Decision Tree가 \(j\)로 예측하였고 실제 그 값이 \(i\)인 개수를 나타낸다. 때문에 Decision Tree가 정확히 맞춘 것은 diagonal(행과 열의 인덱스가 같은 것)에 존재하는 것들이며 그것을 제외한 나머지들은 에러이다. 위 결과에서는 category 3과 5와 같은 경우에는 제대로 맞춘 것이 하나도 없는 결과를 보인다.

다음 코드를 수행하면 간단히 정확도를 계산할 수 있다.

metrics.precision

...

Double = 0.6972303782688576

약 0.7의 값을 보이는데 이 값은 실제 Precision라는 값을 계산한 것이다. 일반적으로 정확도라고 불리기도 하지만 binary classification에서 주로 사용되는 성능 측정 방법이다. positivenegative 두 클래스로 데이터를 분류하는 문제에서 Precision은 Classifier가 Positive라고 판단한 것 중에 실제 Poistive가 차지하는 비율이다. 보통 이 Precision은 Recall과 함께 언급된다. Recall은 실제 Positive 값 중에 Classifier가 Positive라 판단한 것의 비율이다.

예를 들면 50개의 데이터 중에 20개가 Positive인 데이터셋이 있고, Classifier가 50개 중에 10개를 Positive라 판단하였는데 그 10개 중에 4개가 실제 Positive(제대로 Classify 한것을 의미) 데이터라 한다면 Precision은 \( \frac { 4 }{ 10 } = 0.4\) 이며, Recall은 \( \frac { 4 }{ 20 } = 0.2\) 이다. 이것을 Multi-class classification에 적용하여 각각의 카테고리를 Positive라 가정하고 각 수행에서 나머지 카테고리를 Negative라 하여 Precision Recall을 계산할 수 있다. 이런 계산 결과들이 위에서 생성한 MulticlassMatrics클래스에 들어있다. 각 카테고리별로 Recall과 Precision이 얼마나 되는지 다음 코드를 통해 확인할 수 있다.

(0 until 7).map( //Category 는 0부터 시작하기때문에
    category => (metrics.precision(category), metrics.recall(category))
).foreach(println)

수행 결과는 다음과 같다. (수행시마다 다를 수 있다.)

(0.665892389559929,0.7377064520656904)
(0.7440242912120891,0.7441031073446328)
(0.5800777255062385,0.7988732394366197)
(0.0,0.0)
(0.6666666666666666,0.01737242128121607)
(0.0,0.0)
(0.689811320754717,0.44197292069632493)

처음 계산한 정확도인 0.7은 꽤나 괜찮은 결과처럼 보이지만 0.7이라는 정확도가 얼마나 정확한 것인지 판단하기 어렵다. 따라서 그것을 구분하기 위한 기준선이 필요하다. 다음과 같은 함수를 이용하여 인자로 받는 데이터에서 Classifier가 무작위로 클래스를 선택한다고 가정후에 그 것에 대한 정확도를 계산할 수 있다.

def classProbabilities(data: RDD[LabeledPoint]) : Array[Double] = {
    val countsByCategory = data.map(_.label).countByValue()
    val counts = countsByCategory.toArray.sortBy(_._1).map(_._2)
    counts.map(_.toDouble / counts.sum)
}

Classifier가 무작위로 클래스를 선택하기 때문에 각 데이터에 대해서 Classifier가 정확한 클래스를 맞출 할 확률은 데이터에서 각 클래스가 차지하는 비율과 같다. 위 함수를 이용하여 다음과 같이 트레이닝 데이터와 Cross-validation 데이터에서 무작위로 클래스를 선택하는 Classifier의 정확도를 계산할 수 있다.

val trainPriorProbabilities = classProbabilities(trainData)
val cvPriorProbabilities = classProbabilities(cvData)
trainPriorProbabilities.zip(cvPriorProbabilities).map {
    case (trainProb, cvProb) => trainProb * cvProb
}.sum

...

Double = 0.37742100814227625

위 결과를 통해 Random Gessing 은 37%의 정확도를 가짐을 알 수 있다. 때문에 우리가 앞서 구한 70%의 정확도는 괜찮은 결과임을 알 수 있다. 하지만 이것은 Tree.trainClassifier()의 기본 파라미터만을 이용하여 계산한 것이기 때문에 Hyperparameter를 조절하여 더욱 성능을 끌어올릴 수 있다.



Decision Tree Hyperparameters

앞서 설명하였듯이 Decision Tree에는 세 가지의 Hyperparameter로 최대 깊이, 최대 bin의 수, impurity measure가 존재한다.

Decision Tree의 최대 깊이는 Tree의 높이를 이야기한다. Tree의 깊이가 깊다는 것은 데이터를 분류하는데 까지의 과정이 세밀함을 의미하며, 깊이가 깊을 때는 좀 더 정확하게 분류를 할 수 있지만 Overfitting의 위험성이 있다.

Decision Tree는 각각의 질의 단계에서 각 단계에 맞는 decision을 내려야 한다. 이때 결정을 내리기 위한 질문들은 숫자 형태의 Feature인 경우에는 feature >= value와 같은 형태이며, Categorical한 Feature인 경우에는 feature in (value1, value2, ...)형태를 갖는다. 이러한 decision rule의 셋이 각 단계에서 decision을 내리는 데에 사용된다. 이것들을 Spark MLlib에서는 bin이라 부른다. 사용되는 bin의 수가 클수록 계산 시간은 길어지지만 좀 더 최적화된 결과를 낼 수 있다.

Decision Tree를 형성하는데에 있어 좋은 Decision Rule이라 함은 데이터를 명확하게 구분짓는 Rule 일것이다. 예를 들면 어떤 Rule이 Covtype Dataset을 정확히 클래스 1~3과 4~7로 나눈다면 그 Rule은 매우 좋은 Rule이라 할 수 있다. 결론적으로 좋은 Rule을 고른다는 것은 데이터를 두 개의 서브셋으로 나눌 때, 그 사이의 불순물이 적도록 선택하는 것이다. 때문에 어떤 방식으로 불순물이 많은지 판단할지가 매우 중요한데, 일반적으로 많이 사용하는 방식은 Gini방식과 Entropy방식이다.

Gini 방식은 앞서 설명하였던 무작위로 클래스의 레이블을 선택하는 과정과 비슷하다. Gini impurity(불순도) 측정 방식은 어떤 한 클래스를 골라서 무작위로 라벨을 추정하였을 때, 그 추정이 틀릴 확률을 이용하여 계산된다. 만약 이 불순도가 0이 된다면 현재 데이터는 완벽히 한 클래스만 존재하여 정확히 분류가 된 것을 의미한다. 그리고 데이터의 모든 클래스가 같은 비율로 존재할 때 불순도가 제일 높다. 이러한 Gini 불순도 \(I_G\) 는 \( i = 1, 2, ..., N \) 클래스와, 각 클래스가 데이터 중에서 차지하는 비중인 \(p_i\)를 이용해 다음과 같이 표현 가능하다.

\( I_G(p) = \sum _{ i=1 }^{ N }{ { p }_{ i }(1-{ p }_{ i }) } = 1 - \sum _{ i=1 }^{ N }{ { { p }_{ i } }^{ 2 } } \)

Entropy 방식은 Information Theory에서 나온 방법이다. 식이 유도되는 방식을 이야기 하기에는 어렵지만 목표하는 클래스의 서브셋이 얼마만큼의 불확실성을 갖는가에 대한 것이며, 다음과 같이 정의된다.

\( { I }_{ E }(p)=\sum _{ i=1 }^{ N }{ { p }_{ i }\log { \frac { 1 }{ p } } } = -\sum _{ i=1 }^{ N }{ { p }_{ i }log({ p }_{ i }) } \)

데이터셋에 따라 어느 측정 방식이 좋은지는 다르며, Spark의 MLlib 에서는 Gini방식이 기본값이다. 몇몇의 Decision Tree는 Minimum Information Gain이라는 방식을 이용하는데 아직 이 방식은 MLlib에 구현되어 있지 않다.



Tuning Decision Trees

위에서 설명한 파라미터를 조절하여 처음 생성한 Decision Tree 모델보다 성능을 개선시킬 것이다. 영화 추천 모델에서와 마찬가지로 다음 코드와 같이 간단한 형태의 파라미터 조절 실험이 가능하다.

val evaluations = 
    for(impurity <- Array("gini", "entropy");
        depth <- Array(1, 20, 30);
        bins <- Array(10, 300))
    yield {
        val model =
            DecisionTree.trainClassifier(trainData, 7, Map[Int, Int](), impurity, depth, bins)
        val predictionsAndLabels = cvData.map(example =>
            (model.predict(example.features), example.label)
        )
        val accuracy = new MulticlassMetrics(predictionsAndLabels).precision
        ((impurity, depth, bins), accuracy)
    }

evaluations.sortBy(_._2).reverse.foreach(println)

위 코드의 수행 결과는 다음과 같다. 물론 세부적인 수치는 다를 수 있다.

((entropy,30,300),0.938073512955152)
((gini,30,300),0.9331152621158647)
((entropy,30,10),0.9135060686924334)
((gini,30,10),0.9122320736851166)
((entropy,20,300),0.9077386588620125)
((gini,20,300),0.9051734526986314)
((entropy,20,10),0.895394680210037)
((gini,20,10),0.8903675647757596)
((gini,1,300),0.6380304725832832)
((gini,1,10),0.6375484204183525)
((entropy,1,300),0.48788843935611603)
((entropy,1,10),0.48788843935611603)

이 결과로부터 Decision Tree의 깊이를 1로 하는 것은 결과를 생성하는데 부족하다는 것을 알 수 있다. 또한, bin의 수는 높을 수록 좋은 정확도를 보이는 경향을 보였다. 그리고 불순도를 판단하는 방법은 Entropy 방식이 조금 더 좋은 결과를 보였다. 결론적으로 Entropy방법, 최대 깊이 30, 300개의 bin을 이용하여 Decition Tree를 생성하였을 때 가장 높은 성능인 93.8%의 성능을 보였다.

여기서 우리는 찾아낸 파라미터들이 Overfitting 된 것이 아닌지 판단을 해 보아야 한다. 이것은 다음과 같이 위에서 구한 파라미터를 이용하여 트레이닝 데이터와 Cross-Validation 데이터를 이용하여 Decision Tree를 생성하였을 때의 성능을 확인함으로써 판단 가능하다.

val model = DecisionTree.trainClassifier(
    trainData.union(cvData), 7, Map[Int, Int](), "entropy", 30, 300)
val accuracy = getMetrics(model, testData).precision

...

accuracy: Double = 0.9427713442521098

위 코드의 수행 결과는 94.2%의 정확도를 보인다. 만약 앞서 얻은 파라미터가 Overfitting된 파라미터였다면 Cross-Validation데이터를 union한 데이터에 대한 Decision Tree의 정확도는 더 떨어졌을 것이다. 하지만 앞서 수행하였던 결과보다 데이터를 추가하여 Decision Tree 트레이닝 하였고, 성능이 증가하였다. 따라서 앞에서 얻은 파라미터가 트레이닝 데이터에 Overfitting되지 않은 파라미터임을 알 수 있다.



Categorical Features Revisited

지금까지 작성한 예제 코드에서는 Map[Int, Int]() 파라미터에 대한 설명을 거의 하지 않고 진행하였다. 이 파라미터는 7과 같이 각각의 Categorical한 Feature의 값의 가지 수에 대한 것이다. Map의 key는 입력 벡터의 인덱스를 나타내며, value들은 벡터의 각 인덱스에 존재하는 Categorcal한 값의 가지 수를 나타낸다. 때문에 비어있는 Map()이 파라미터로 전달되었을 때는 Categorical한 값이 없음을 나타내며, 모든 값이 숫자 형태의 데이터임을 의미한다.

다행히도 지금까지 이용한 covtype 데이터셋은 Categorical Feature를 여러 개의 Numeric Feature를 이용하여 표현하고 있다. 이때, 0과 1을 이용한 binary 형태의 값 표현이기 때문에 Decision Tree를 구성할 때 큰 문제가 생기지는 않는다. 하지만 당연히도, 이러한 형태의 Categorical Feature에 대한 표현은 Decision Tree를 구성할 때 하나의 Categorical Feature를 구분하기 위해 그 Feature를 표현하는데 사용되는 모든 Numeric Feature를 고려하여 Decision Rule을 구성하게된다. 이때, 메모리 사용량도 증가할 것이며 속도 역시 감소할 것이다.

이러한 것을 피하는 방법은 다음과 같이 여러 개의 Numeric Feature로 구성되어 있는 하나의 Categorical Feature를 통합하여 재구성하는 것이다.

val data = rawData.map { line =>
    val values = line.split(',').map(_.toDouble)
    val wilderness = values.slice(10, 14).indexOf(1.0).toDouble
    val soil = values.slice(14, 54).indexOf(1.0).toDouble
    val featureVector = Vectors.dense(values.slice(0, 10) :+ wilderness :+ soil)
    val label = values.last - 1
    LabeledPoint(label, featureVector)
}

위처럼 기존에 Categorical Feature를 여러 개의 Numeric Feature로 표현하던 것을 하나의 Feature로 변경한다. 그리고 다음의 코드에서처럼 DecisionTree를 생성할 때 Map()에 해당 Feature의 인덱스와 그 값의 가지 수를 함께 전달하여 Categorical Feature를 그대로 활용하도록 한다. 다만, bin의 수는 반드시 Categorical Feature의 가지 수 보다 커야 하기 때문에 bind르 40이상으로 설정하여 수행한다.

val Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))
trainData.cache()
cvData.cache()
testData.cache()

val evaluations = 
    for(impurity <- Array("gini", "entropy");
        depth <- Array(1, 20, 30);
        bins <- Array(40, 300))
    yield {
        println((impurity, depth, bins))
        val model =
            DecisionTree.trainClassifier(trainData, 7, Map(10 -> 4, 11 -> 40), impurity, depth, bins)
        val predictionsAndLabels = cvData.map(example =>
            (model.predict(example.features), example.label)
        )
        val trainAccuracy = getMetrics(model, trainData).precision
        val cvAccuracy = getMetrics(model, cvData).precision
        ((impurity, depth, bins), (trainAccuracy, cvAccuracy))
    }

evaluations.sortBy(_._2._2).reverse.foreach(println)

위 예제의 수행 결과는 다음과 같다.

((entropy,30,300),(0.9997400744976563,0.9424945022597011))
((entropy,30,40),(0.9995381489007944,0.9389967273293969))
((gini,30,300),(0.9997142967618867,0.9377153642361171))
((gini,30,40),(0.9996197783973981,0.9343734307631036))
((entropy,20,300),(0.9675436825214062,0.9258194663295873))
((gini,20,40),(0.9666049433104628,0.9238974216896677))
((gini,20,300),(0.9692987166983876,0.9238627902547142))
((entropy,20,40),(0.966656498782002,0.9227892157711555))
((gini,1,300),(0.6335093379847826,0.6374261917542553))
((gini,1,40),(0.6328541538673048,0.6369413516649063))
((entropy,1,300),(0.4876546127110015,0.4874201312531385))
((entropy,1,40),(0.4876546127110015,0.4874201312531385))

위 결과는 트레이닝 셋에 대한 수행 결과는 Overfitting 된 결과를 보이지만 Cross-Validation 데이터를 이용해 정확도를 판별하였을 때에는 데이터를 변환하지 않았을 때의 성능과 비슷한 성능을 보였다. 또한 다음과 같이 트레이닝 셋에 Cross-Validation 데이터를 포함한 것을 이용해 Decision Tree를 훈련시켜 그 결과도 확인하였다. 그 결과 역시 변경 전의 결과와 비슷한 성능을 보였다. 하지만 두 경우 모두 약간의 성능 증가는 존재하였다.

val model = DecisionTree.trainClassifier(trainData.union(cvData), 7, Map(10 -> 4, 11 -> 40), "entropy", 30, 300)
val accuracy = getMetrics(model, testData).precision
...

accuracy: Double = 0.9450667034844589

(실제 책에서는 3% 차이로 더 큰 차이가 발생하였으나, 이 글 작성시 수행한 실험에서는 큰 차이가 나지 않았다.)



Random Decision Forests

만약 위 코드를 그대로 따라왔다면, 직접 수행한 결과와 이 포스트에 있는 수행 결과가 약간 다를 것이다. 그 이유는 Decision Tree가 가지고 있는 랜덤성 때문이다. Decision Tree 알고리즘이 각 단계에서 가능한 모든 경우의 수를 고려하게되면 수많은 시간이 필요하기 때문에 그렇게 할 수 없다. 만약 하나의 Categorical Feature가 \(N\)가지의 경우를 갖는다면 \({ 2 }^{ N } - 2\)의 Decision Rule이 존재한다. 따라서 \(N\)이 커진다면 가능한 Decision Rule의 수 역시 기하급수적으로 증가하게 될 것이다.

그래서 모든 경우의 수를 확인하는 방법 대신에, 몇몇 휴리스틱한 방법을 이용하여 성능을 증가시킨다. 그 중 하나는 배깅(Bagging)이라는 방식인데, 한 개의 Decision Tree가 아닌 여러 독립적인 Decision Tree를 이용하여 각 Tree가 특정 결과를 생성하고, 그 각 결과의 평균치 등을 이용하여 예측치를 결정하는 것이다. 단순히 이야기 하면 Voting 방식이다. 이러한 방식은 단순히 하나의 Decision Tree만으로 결과를 내는 것보다 정확한 결과를 낼 확률이 높으며, 이때 여러 독립적인 Decision Tree를 생성하는 과정에서 랜덤성(Randomness)이 활용되고, 이것이 Random Decision Tree의 주요 아이디어가 된다.

다음과 같이 Spark MLlib의 RandomForest를 이용하여 쉽게 Random Decision Forest를 생성할 수 있다.

val forest = RandomForest.trainClassifier(
    trainData, 7, Map(10 -> 4, 11 -> 40), 20, "auto", "entropy", 30, 300)

Random Decision Forest는 새로운 두 개의 파라미터가 등장한다. 첫 번째 파라미터는 몇 개의 Decision Tree를 생성할 것인지에 대한 파라미터이며, 본 예시에서는 20으로 정하였다. 한 개의 Decision Tree를 생성하는 것이 아니라 20개의 Tree를 생성하기 때문에 지금까지 수행하였던 예시들보다 수행 시간이 오래 걸린다. 두 번째 파라미터는 트리의 각 레벨에서 어떤 feature를 선택할지에 대해 평가를 하는 방식인데, 여기서는 "auto"를 이용하였다. Random Decision Forest에서는 모든 feature를 전부 고려하지 않고 전체 중 일부만 선택하여 활용하는데, 어떤 방식으로 일부를 선택할 것인지에 대한 것이다. 예측 과정은 단순히 Decision Tree들의 가중평균으로 계산된다. Categorcal Feature는 결과 중 가장 많이 나온 값으로 선택하는 방식을 따른다.

다음과 같이 생성한 Random Decision Forest의 성능을 평가할수 있다.

def getForestMetrics(model: RandomForestModel, data: RDD[LabeledPoint]): MulticlassMetrics = {
    val predictionsAndLabels = data.map {
        example => (model.predict(example.features), example.label)
    }
    new MulticlassMetrics(predictionsAndLabels)
}

val accuracy = getForestMetrics(forest, testData).precision

...

accuracy: Double = 0.9632742522824155

트레이닝 데이터로만 생성한 Random Decision Forest의 정확성은 96.3%로 지금까지 생성하였던 단일 Decision Tree보다 성능이 높음을 알 수 있다.

Random Decision Forest는 Spark와 MapReduce와 같은 빅 데이터의 데이터 처리 방식과도 연관성이 있다. Random Decision Forest를 생성할 때는 각각의 Tree를 독립적으로 생성하는데, 이것은 언급한 빅데이터 기술들은 데이터를 독립적으로 처리하는 매커니즘을 포함하고 있기 때문이다.



Making Predictions

Decision Tree와 Random Decision Forest를 생성하는 과정이 흥미로웠지만, 이것은 완전한 목표가 아니다. 최종 목표는 주어진 벡터를 생성한 Model을 이용해 결과를 예측하는 것이다. 근데, 지금까지의 과정을 제대로 따라왔다면 정말로 쉽다. 이미 앞선 과정에서 getMetrics 함수와 getForestMetrics 함수 안에서 해당 과정을 수행하고 있기 때문이다.

DecisionTree와 RandomForest의 훈련 결과는 각각 DecisionTreeModel과 RandomForestModel 객체인데, 두 객체는 모두 predict()함수를 포함하고 있다. 이 함수는 벡터를 인자로 받아 해당 데이터에 맞는 예측 결과를 반환하는 함수이다. 따라서 다음과 같이 특정 벡터에 대해 그 결과를 예측할 수 있다.

val input = "2709,125,28,67,23,3224,253,207,61,6094,0,29"
val vector = Vectors.dense(input.split(',').map(_.toDouble))
forest.predict(vector)

...

Double = 4.0

이 결과는 4가 나와야 하는데, 이것은 위 벡터 "2709,125,28,67,23,3224,253,207,61,6094,0,29"의 예측 결과가 숲의 종류 5임을 뜻한다(원래의 데이터는 클래스가 1 부터 시작하고, 트레이닝에서 활용한 데이터는 0부터 시작하는 것으로 변경하였으므로). 이것은 Covtype 데이터 셋에서 "Aspen" 임을 뜻한다.

위 예시에서는 단순히 하나의 벡터에 대해서만 수행했지만, RDD로 구성된 여러 개의 벡터를 한번에 예측할 수도 있다.

Read More...
Spark & 머신 러닝 - Recommending Music - 2/2

지난 포스트에 이어 ALS를 이용한 추천 알고리즘의 성능을 평가하는 과정에 대한 글이다.

이 포스트는 Advanced Analytics with Spark을 정리한 글이다.

Evaluating Recommendation Quality

추천의 수행 결과를 평가하는 것으로 가장 정확한 방법은, 각 사용자가 추천 결과를 보고 그것에 대해 평가를 내리는 것이 가장 정확한 방식이다. 하지만 이러한 과정은 몇몇의 사용자를 샘플링 하여 진행한다고 하더라도 실제적으로 불가능에 가까운 방식이다. 때문에 사용자들이 들었던 아티스트들은 끌리는 아티스트들이고, 사용자들이 듣지 않은 아티스트들은 그렇지 않은 아티스트라고 가정하여 평가를 수행하는 것이 납득할만한 방법이다. 이러한 가정은 문제를 위한 가정이긴 하지만, 다른 데이터를 추가적으로 사용하지 않고 적용시킬 수 있다는 장점이 있다.

이 방법을 이용하여 추천 모델을 평가하기 위해서는 데이터를 분리하여 분리된 데이터는 ALS 모델을 생성하는 과정에서 제외시키는 과정이 필요하다. 그러면 이 분리된 데이터는 사용자들에 대한 좋은 추천 결과들을 가지고 있는 것으로 해석될 수 있다. 결과적으로 추천 시스템은 분리된 데이터를 제외하고 추천 모델을 생성시킨 후에 추천을 수행할 것이고, 추천이 이상적이라면 추천 시스템이 생성한 추천 결과의 상위권에 이 분리된 데이터들이 존재해야 할 것이다.

추천 결과를 분리한 아티스트의 리스트와 비교하여 0.0에서 1.0의 범위를 갖는 값(높을 수록 좋은 추천 결과를 나타냄)으로 수치화시킬 수 있다. (모든 아티스트의 쌍과 비교할 수 있지만, 그렇게 되면 너무 많은 쌍이 발생할 수 있기 때문에 일부 샘플된 쌍만 비교하는 것으로 한다.) 그리고 여기서 0.5는 무작위로 추천을 수행하였을 때의 기대값이라 한다.

이 방식은 ROC Curve와 직접적인 연관성을 갖는다. 앞서 얘기한 방식은 AUC, Area Under the Curve을 나타내는데, 이것은 무작위로 생성된 추천 결과에 비해 좋은 추천 결과들이 얼마만큼의 좋은 추천을 수행했는가 판단하는데에 이용된다.

AUC는 일반적인 Binary Classifier 와 같은 일반적인 Classifier 에서도 평가 방법으로 많이 이용된다. Spark의 MLlib에서는 BinaryClassificationMetrics에 이것이 구현되어있다. 이 글에서는 각 사용자별 AUC를 계산하고, 그것을 평균낼 것이다.



Computing AUC

이 절에서 수행되는 코드는 지난 포스트의 코드까지 수행된 것을 가정한다.

val allData = rawUserArtistData.map{ line => 
    val Array(userId, artistId, count) = line.split(' ').map(_.toInt)
    val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId)
    Rating(userId, finalArtistId, count)
}.cache()

val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()

val allItemIDs = allData.map(_.product).distinct().collect()
val bAllItemIDs = sc.broadcast(allItemIDs)

val model = ALS.trainImplicit(trainData, 20, 5, 0.01, 1.0)

위 코드는 데이터셋을 9:1의 비율로 나누어 90%의 데이터를 트레이닝 데이터로, 나머지 10%의 데이터를 Cross-Validation 데이터로 사용하여 추천 모델을 훈련시키는 과정을 나타낸다.

그리고 다음 코드는, 생성된 추천 모델의 predict 함수를 이용하여 AUC를 계산하는 것에 대한 함수이다. 이 함수를 그대로 shell 에 입력하거나, 따로 파일에 작성하여 이전 포스트 에서 수행하였던 방식처럼 불러와도 된다.

import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

// 각 사용자별로 AUC를 계산하고, 평균 AUC를 반환하는 함수.
def areaUnderCurve(
      positiveData: RDD[Rating],
      bAllItemIDs: Broadcast[Array[Int]],
      predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = {

    // Positive로 판단되는 결과들, 즉 전체 데이터에서 Cross-validation을 하기 위해 남겨둔
    // 10%의 데이터를 이용하여 Positive한 데이터로 저장한다.
    val positiveUserProducts = positiveData.map(r => (r.user, r.product))
    // Positive 데이터에서 (사용자, 아티스트ID)별로 각각의 쌍에 대한 예측치를 계산하고,
    // 그 결과를 사용자별로 그룹화한다.
    val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user)

    // 각 사용자에 대한 Negative 데이터(전체 데이터셋 - Positive 데이터)를 생성한다.
    // 전체 데이터 셋에서 Positive 데이터를 제외한 아이템 중 무작위로 선택한다.
    val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions {
      // 각 파티션에 대해서 수행한다.
      userIDAndPosItemIDs => {
        // 각 파티션 별로 난수 생성기를 초기화
        val random = new Random()
        val allItemIDs = bAllItemIDs.value

        userIDAndPosItemIDs.map { case (userID, posItemIDs) =>
          val posItemIDSet = posItemIDs.toSet
          val negative = new ArrayBuffer[Int]()
          var i = 0
          // Positive 아이템의 갯수를 벗어나지 않도록하는 범위 내에서
          // 모든 아이템 중 무작위로 아이템을 선택하여
          // Positive 아이템이 아니라면 Negative 아이템으로 간주한다.
          while (i < allItemIDs.size && negative.size < posItemIDSet.size) {
            val itemID = allItemIDs(random.nextInt(allItemIDs.size))
            if (!posItemIDSet.contains(itemID)) {
              negative += itemID
            }
            i += 1
          }
          // (사용자 아이디, Negative 아이템 아이디)의 쌍을 반환한다.
          negative.map(itemID => (userID, itemID))
        }
      }
    }.flatMap(t => t)
    // flatMap을 이용하여 묶여져 있는 셋을 하나의 큰 RDD로 쪼갠다.

    // Negative 아이템(아티스트)에 대한 예측치를 계산한다.
    val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user)

    // 각 사용자별로 Positive 아이템과 Negative 아이템을 Join 한다.
    positivePredictions.join(negativePredictions).values.map {
      case (positiveRatings, negativeRatings) =>
        // AUC는 무작위로 선별된(처음에 10%를 무작위로 분리하였으므로) Positive 아이템의 Score가
        // 무작위로 선별된(negativeUserProducts 를 구할 때 무작위로 선택하였으므로) Negative 아이템의 Score보다
        // 높을 확률을 나타낸다. 이때, 모든 Postive 아이템과 Negative 아이템의 쌍을 비교하여 그 비율을 계산한다.

        var correct = 0L
        var total = 0L
        // 모든 Positive 아이템과 Negative 아이템의 쌍에 대해
        for (positive <- positiveRatings; negative <- negativeRatings) {
          // Positive 아이템의 예측치가 Negative 아이템의 예측치보다 높다면 옳은 추천 결과
          if (positive.rating > negative.rating) {
            correct += 1
          }
          total += 1
        }
        // 전체 쌍에서 옳은 추천 결과의 비율을 이용한 각 사용자별 AUC 계산
        correct.toDouble / total
    }.mean() // 전체 사용자의 AUC 평균을 계산하고 리턴한다.
  }

위 함수를 이용하여 다음과 같이 AUC를 계산할 수 있다. 함수의 동작 과정에 대한 설명은 코드에 포함되어 있는 주석으로 대신한다.

val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict)
...
auc: Double = 0.9623184489901165

수행 결과는 조금 다를 수 있겠지만 거의 0.96에 가까운 수치가 나올 것이다. 이 수치는 무작위로 추천을 수행했을 때의 기대값인 0.5 보다 많이 높은 값이며, 최대값인 1.0에 매우 가까운 수치이다. 따라서 괜찮은 추천을 수행해 주었다고 할 수 있다.

이 과정을 전체 데이터 셋을 90%의 트레이닝 데이터와 나머지 10% 데이터로 구분하는 것부터 다시 수행함으로써 좀 더 최적화된 평가 수치를 얻을 수 있다. 실제로 전체 데이터 셋을 \(k\)개의 서브셋으로 분리하고, \(k-1\)개의 서브셋을 트레이닝 데이터로, 나머지 한 개의 서브셋을 평가용으로 사용하여 \(k\)번 반복하는 방식이 존재한다. 이것이 일반적으로 불리는 K-fold Cross-validation 방식이다.

앞서 계산한 결과가 어느정도의 결과를 갖는지 간단한 벤치마크 값을 계산하여 비교해 볼 수도 있다. 모든 사용자에게 가장 많이 플레이 된 아티스트를 똑같이 추천해 주는 것이다. 이런 추천은 개인화된 추천이 아니지만 간단하고, 빠른 방법이다. 이 경우의 AUC를 계산하여 앞서 계산한 결과와 어느정도 차이가 있는지 확인해 볼 수 있다.

다음과 같이 함수 predictMostListened함수를 정의하여 사용한다.

import org.apache.spark.SparkContext

def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int, Int)]) = {
    val bListenCount = sc.broadcast(
        train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap()
    )
    allData.map { case (user, product) =>
        Rating(user, product, bListenCount.value.getOrElse(product, 0.0))
    }
}

val auc = areaUnderCurve(cvData, bAllItemIDs, predictMostListened(sc, trainData))

이 결과는 0.93 정도가 나온다. 앞서 우리가 추천 모델을 이용하여 수행한 추천의 결과가 더 높은 것을 알 수 있다. 하지만 좀 더 결과를 좋게 만들 수 없을까?

Hyperparameter Selection

한 가지 간단한 방법은 추천 모델 형성에 사용된 몇 개의 Hyperparameter를 조절해보는 것이다. 지금까지의 추천 모델 형성 과정에서는 이 값에 대해 언급이 없었지만, 사용되었던 파라미터와 그 기본값은 다음과 같다.

rank = 10

rank 파라미터는 user-feature 행렬과 product-feature 행렬을 구성할 때 column \(k\)의 크기를 의미한다.

iterations = 5

iterations는 Matrix Factorization 과정을 몇번 반복할 것인가에 대한 것이다. 횟수가 많아질 수록 추천의 성능은 좋아지지만, 수행 시간이 늘어난다.

lambda = 0.01

Overfitting을 막아주는 파라미터이다. 값이 높을수록 Overfitting 을 막아주지만, 너무 높다면 추천의 정확도를 저하시킨다.

alpha = 1.0

Alpha는 Implicit Feedback 방식에서 사용되는 파라미터로, user-product의 baseline confidence(값이 존재하는 데이터와 그렇지 않은 데이터 중 어떤것에 초점을 둘 것인지)를 조절하는 파라미터이다.

이 파라미터들을 조절하여 추천 모델의 성능을 증가시킬 수 있다. 파라미터를 조절하여 최적의 값을 찾는 방식에는 다양한 방법이 있지만, 여기선 간단하게만 변화를 주어 테스트를 할 것이다. 다음 코드와 같이 각 rank, lambda, alpha 에 두 개의 값으로 변화를 주어 그 결과로 계산되는 AUC를 비교할 것이다.

val evaluations =
    for(rank    <- Array(10, 50);
        lambda  <- Array(1.0, 0.0001);
        alpha   <- Array(1.0, 40.0))
        yield {
         val model = ALS.trainImplicit(trainData, rank, 10, lambda, alpha)
         val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict)
         ((rank, lambda, alpha), auc)
    }

evaluations.sortBy(_._2).reverse.foreach(println)

수행 결과는 다음과 같다.

((10,1.0,40.0),0.9775933769125035)
((50,1.0,40.0),0.9775096131405069)
((10,1.0E-4,40.0),0.9767512207167729)
((50,1.0E-4,40.0),0.9761886422104153)
((10,1.0,1.0),0.9691674538720272)
((50,1.0,1.0),0.9670028532287775)
((10,1.0E-4,1.0),0.9648010615992904)
((50,1.0E-4,1.0),0.9545102924987607)

위 결과로 보아 rank는 10, lambda는 1.0, alpha를 40으로 하였을 때가 기본 설정으로 하였을 때보다 추천 성능이 좋음을 알 수 있다. 이런 방식으로 추천 모델을 최적화할 수 있다.

여기서 각 파라미터가 추천 결과에 어떤 영향을 미치는지 분석할 수 있다. Alpha 파라미터는 1일 때보다 40일때 추천의 성능이 증가되었다. 흥미로운 점은 이 40이라는 값이 지난 포스트에서 언급한 논문이 제안한 기본값이라는 것이다. 그리고 낮은 값인 1 보다 큰 값인 40일 때 성능이 좋은 것으로 보아 사용자가 특정 아티스트를 들었다는 정보가 듣지 않았다는 정보보다 추천 모델을 형성하는데에 있어 더욱 효과적이라는 것을 나타낸다.

lambda는 매우 적은 차이를 이끌어낸다. 하지만 높은 Lambda를 사용하였을 때 추천 성능이 더욱 좋은 것으로 보아 Overfitting을 효과적으로 방지하였음을 알 수 있다. Overfitting에 대해서는 다음 장에서 자세하게 살펴 볼 것이다.

column의 크기 \(k\)는 rank 파라미터의 값으로 보아 크게 중요하지 않음을 알 수 있다. 오히려 값이 50으로 클 때가 성능이 더 좋지 않았다. 따라서 너무 큰 \(k\)를 설정하게 되면 오히려 추천 성능이 감소함을 유추할 수 있다.

파라미터를 설정할 때 모든 파라미터에 대해 완벽하게 이해하고 있을 필요까지는 없다. 하지만 적어도 파라미터들이 어느 범위의 값을 갖는지 정도를 안다면, 여러 모델을 최적화하는데 많은 도움이 된다.

Read More...