Spark & 머신 러닝 - Recommending Music - 1/2

이 글에서는 Spark를 이용하여 추천을 수행하는 과정에 대해 설명한다. Audioscrobbler Dataset 를 이용하여 사용자가 좋아할 만한 음악을 추천해 주는 작업을 할 것이다.

이 포스트는 Advanced Analytics with Spark을 정리한 글이다.

Chapter 3를 두 개의 글로 나누었다. 첫 번째 글은 추천을 수행하고, 간단히 추천 수행 결과를 확인해 보는 정도로 마무리 하고, 두 번째 글은 생성한 추천 모델이 얼마나 효과적으로 추천을 수행해주는지 분석하는 과정이다.



Introduction

추천 엔진은 사람들이 가장 쉽게 접할 수 있는 머신 러닝의 한 예라고 할 수 있다. Amazon, Youtube과 같은 사이트는 물론 대부분의 서비스는 자체적으로 추천 기능을 제공한다. 추천 시스템의 결과물은 현재 시스템을 사용하고 있는 사람이 좋아할만한 아이템이기 때문에, 다른 머신 러닝 알고리즘에 비해 좀 더 직관적으로 이해할 수 있다. 그만큼 추천 시스템은 많은 사람들에게 이미 널리 알려져 있고, 익숙한 머신 러닝 알고리즘이다.

이 챕터에서는 Spark에 정의되어 있는 핵심 머신 러닝 알고리즘 중 추천 시스템과 연관이 있는 것들에 대해 알아볼 것이고, 그것을 이용해 사용자에게 음악을 추천 해 줄 것이다. 이러한 과정들은 Spark와 MLlib의 실제 예시가 될것이며, 이어지는 다른 챕터들에서 사용하게 될 머신 러닝과 관련된 아이디어들에도 도움을 주게 될 것이다.



Data Set

이 챕터에서 수행할 예시 데이터는 Audtioscrobbler에서 제공하는 데이터셋이다. Audioscrobbler 는 Last.fm 에서 처음으로 활용한 음악 추천 시스템이다. Audioscrobbler는 사용자들이 듣는 음악 정보를 자동으로 서버로 전송하여 기록하는 API 를 제공하였는데, 이러한 API의 사용은 많은 사용자의 정보를 기록하는 것으로 이어졌고, Last.fm 에서는 이 정보를 강력한 음악 추천 엔진을 구성하는데 사용하였다.

그 당시의 대부분의 추천 엔진에 대한 연구는 평가기반(사용자 x가 아이템 a에 평점 5를 남겼다)의 데이터에 대한 것들이었다. 하지만 흥미롭게도 Audioscrobbler 데이터는 사용자들이 어떠한 음악을 플레이했다에 대한 정보(사용자 x는 음악 a를 플레이했다.)밖에 제공되지 않는다. 이러한 데이터는 기존 데이터에 비해 난이도가 있는데, 그 이유는 사용자가 음악을 재생했다는 정보가 그 음악을 좋아한다고는 볼 수 없기 때문이다. 이러한 형태의 데이터 셋을 Implicit Feedback Dataset이라 한다.

위 링크에서 데이터셋을 다운로드 받아 압축을 해제하면 몇 개의 파일이 나온다. 그 중 가장 핵심이 되는 데이터 파일은 user_artist_data.txt 파일이다. 이 파일은 141,000명의 사용자와 160만명의 아티스트에 대한 정보가 들어있으며 사용자의 아티스트에 대한 플레이 정보는 약 2400만 정도의 기록이 저장되어있다. artist_data.txt파일은 모든 아티스트에 대한 정보가 들어있지만, 이 데이터 안에는 같은 아티스트를 가리키지만 서로 다른 이름으로 저장되어 있는 경우가 있다. 때문에 이를 위해 같은 아티스트를 가리키고 있는 ID의 Map인 artist_alias.txt 파일이 존재한다.



The Alternating Least Squares Recommender Algorithm

추천을 수행하기에 앞서, 설명한 데이터 셋의 형태에 맞는 추천 알고리즘을 선택해야 한다. 우리가 가지고 있는 데이터 셋은 Implicit feedback 형태이며, 사용자에 대한 정보(성별, 나이 등)라던가 아티스트에 대한 정보 역시 존재하지 않는다. 따라서 활용할 수 있는 정보는 어떠한 사용자가 어떤 아티스트의 노래를 들었다 라는 정보 뿐이고, 이러한 기록만 이용해서 추천을 수행해야 한다.

이러한 조건에 알맞는 추천 알고리즘은 Collaborative Filtering, CF(협업 필터링)이다. CF는 아이템이나 사용자의 속성을 사용하지 않고 단순히 둘 사이의 관계정보(이 데이터 셋에서는 음악 플레이 여부)만 이용하여 추천을 수행하는 알고리즘이다.

CF에는 여러 알고리즘들이 존재하는데, 여기선 Matrix Factorization 모델을 이용한 추천 알고리즘을 이용하여 추천을 수행한다. Matrix Factorization 계열의 추천 알고리즘은 \( i \times j\) 크기의 행렬을 생성하고, 사용자 \(i\)가 아티스트 \(j\)의 음악을 플레이 했다는 정보를 행렬의 데이터로 이용한다. 이 행렬을 \(A\)라 할때, 전체 데이터에 비해서 사용자-아티스트의 조합이 매우 적기 때문에 행렬 \(A\)의 데이터는 듬성듬성 존재한다(Sparse 하다고 한다).

Matrix Factorization 방식에서는 이 행렬 \(A\)를 두 개의 작은 행렬 \(X\)와 \(Y\)로 쪼갠다. 이 때 각 행렬의 크기는 \(i \times k\), \(j \times k\)로, 원래 행렬의 행과 열의 크기가 매우 크기 때문에 두 행렬의 행 역시 매우 크다. 그리고 \(k\)는 Latent factor로써, 사용자와 아티스트 사이의 연관을 표현하는데에 이용된다.

Matrix Factorization

위 그림 3-1[1]과 같이 행렬 \(X, Y\)를 계산한 후에, 사용자 \(i\)의 아티스트 \(j\)에 대한 평점을 계산하기 위해서는 행렬 \(X\)의 \(i\)번째 행과, 행렬 \(Y^T\)의 \(j\)번째 열을 곱하여 계산한다.

이러한 방법을 기반으로 한 추천 알고리즘이 많이 존재 하는데, 이 챕터에서 사용되는 알고리즘은 Alternating Least Squares 알고리즘이다. 이 알고리즘은 Netflix Prize 에서 우승한 논문인 "Collaborative Filtering for the Implicit Feedback Datasets"과, "Large-scale Parallel Collaborative Filtering for the Netflix Prize"에서 주로 사용된 방식이다. 또한 Spark의 MLlib 에는 이 두 논문의 구현체가 구현되어 있다. 이 것을 이용해 이번 챕터를 진행 할 것이다.



Preparing the Data

다운로드 받은 Audtioscrobbler에서 제공하는 데이터셋을 압축 해제하고, HDFS에 업로드한다. 다음과 같이 /audio 경로에 업로드하는것을 가정한다.

-rw-r--r--   1 hyunje supergroup    2932731 2015-07-12 04:19 /audio/artist_alias.txt
-rw-r--r--   1 hyunje supergroup   55963575 2015-07-12 04:19 /audio/artist_data.txt
-rw-r--r--   1 hyunje supergroup  426761761 2015-07-12 04:19 /audio/user_artist_data.txt

또한, 데이터의 크기가 크고, 계산량이 많기 때문에 Spark Shell 을 수행시킬 때 다음과 같이 드라이버의 메모리 용량을 6GB이상을 확보시켜야 한다.

spark-shell --driver-memory 6g --master local[2]

Spark의 MLlib 에 한 가지 제한이 있는데, 사용자와 아이템 아이디의 크기가 Integer.MAX_VALUE 보다 크면 안된다는 것이다. 즉 2147483647을 초과할 수 없다. 이를 다음과 같이 확인해 볼 수 있다.

val rawUserArtistData = sc.textFile("/audio/user_artist_data.txt")
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()

위 명령어의 수행 결과는 다음과 같으며, 이는 데이터를 다른 변환 없이 그대로 사용해도 무방함을 나타낸다.

org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)
org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)

그리고 추천을 수행하기 위해 아티스트의 데이터를 읽어 이를 기억해야 할 필요가 있다. 다음과 코드를 이용해 아티스트 데이터를 불러올 수 있다.

val rawArtistData = sc.textFile("/audio/artist_data.txt")
val artistById = rawArtistData.flatMap( line => {
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
        None
    } else {
        try {
            Some((id.toInt, name.trim))
        } catch {
            case e: NumberFormatException => None
        }
    }
})

또한, 앞서 설명하였듯이 각 아티스트가 오타 등의 이유로 다른 텍스트로 표현될 수 있기 때문에 이를 하나로 통합시켜야 한다. artist_alias.txt 파일의 각 행은 두 개의 열 badID \t good ID로 이루어져 있으며, 해당 파일을 읽어 드라이버 에서 Map 형태로 기억하고 있는다. 이 작업은 다음 코드를 수행함으로써 이뤄진다.

val rawArtistAlias = sc.textFile("/audio/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap( line => {
    val tokens = line.split('\t')
    if(tokens(0).isEmpty) {
        None
    } else {
        Some((tokens(0).toInt, tokens(1).toInt))
    }
}).collectAsMap()

artistAlias.get(6803336)의 결과는 아이디 1000010이기 때문에, 다음과 같은 예시를 통해 정상적으로 데이터가 불러와졌는지 확인할 수 있다.

artistById.lookup(6803336)
artistById.lookup(1000010)

위 코드의 수행 결과는 각각 Aerosmith (unplugged)Aerosmith를 나타내며, 이는 정상적인 결과를 의미한다.

Building a First Model

Spark 의 MLlib에 구현되어 있는 ALS를 사용하기 위해선 두 가지의 변환 과정이 필요하다. 첫번째는 기존에 구한 아티스트의 ID를 앞서 생성한 Map 을 이용하여 같은 ID끼리 묶어야 하며, 데이터를 MLlib의 ALS에서 사용하는 입력 형태인 Rating 객체로 변환해야 한다. Rating객체는 사용자ID-ProductID-Value형태를 갖는 객체인데, 이름은 Rating 이지만 Implicit 형태의 데이터에서도 사용 가능하다. 이 챕터에서는 Value를 ProductID 를 아티스트의 ID, Value를 사용자가 해당 아티스트의 노래를 재생한 횟수로 사용할 것이다. 다음과 같은 코드를 이용하여 추천을 수행하기 위한 데이터를 준비한다.

import org.apache.spark.mllib.recommendation._

val bArtistAlias = sc.broadcast(artistAlias)
val trainData = rawUserArtistData.map( line => {
    val Array(userId, artistId, count) = line.split(' ').map(_.toInt)
    val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId)
    Rating(userId, finalArtistId, count)
}).cache()

위 코드에서 중요한 부분은 기존에 생성하였던 artistAlias Map 을 broadcast하는 과정이다. Broadcast를 하지 않는다면 artistAlias 를 Spark가 생성하는 모든 Task 마다 복사하여 사용하게 된다. 하지만 이러한 작업은 큰 비용을 소비한다. 각각의 과정은 최소 몇 메가 바이트 에서 몇십 메가 바이트(크기에 따라 다르며, 이 예시에서의 크기임)를 소비하기 때문에, JVM에서 생성하는 모든 Task 에 이 데이터를 복사한다는 것은 매우 비효율적이다.

따라서 생성한 Map을 Broadcasting 함으로써 Spark Cluster의 각 Executer 가 단 하나의 Map만 유지할 수 있도록 한다. 때문에 Cluster에서 여러 Executer 가 수많은 Task 를 생성할 때 메모리를 효율적으로 관리할 수 있도록 해준다.

그리고 지금까지 계산한 결과를 cache()를 통해 메모리에 임시 저장함으로써, trainData 변수를 접근할 때마다 map 을 다시 수행하는 것을 막는다.

생성한 변수들을 이용해 다음과 같이 추천 모델을 생성할 수 있다.

val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

위 과정은 앞에서 설명한 MatrixFactoriation 방식을 이용해 추천 모델을 생성하는 과정이다. 이때 클러스터의 상태에 따라 수행시간은 몇 분 정도 수행될 수 있다. 그리고 다음 코드를 수행함으로써 내부 Feature 들이 정상적으로 계산되었는지 확인한다(정확한 값인지는 모르지만).

model.userFeatures.mapValues(_.mkString(", ")).first()
model.productFeatures.mapValues(_.mkString(", ")).first()

...

(Int, String) = (90,-0.8930547833442688, -0.7431690096855164, -0.6351532936096191, -0.28394362330436707, 0.14852239191532135, -0.37798216938972473, -0.923484742641449, -0.12640361487865448, 0.5575262308120728, -0.35868826508522034)
(Int, String) = (2,-0.08458994328975677, 0.027468876913189888, -0.16536176204681396, 0.08694511651992798, 0.019154658541083336, -0.12874850630760193, -0.04696394130587578, -0.0629991888999939, 0.15156564116477966, 0.0011008649598807096)

알고리즘이 랜덤성을 갖고 있기 때문에 수행 결과는 위와 다를 수 있다.



Spot Checking Recommendations

이제 실제로 사용자들에게 추천을 잘 수행해 주었는가를 확인해 봐야 한다. 2093760 사용자에 대해 과연 추천을 잘 수행했는지 확인해 볼 것이다.

val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter({
    case Array(user,_,_) => user.toInt == 2093760
})

val existingProducts = rawArtistsForUser.map({
    case Array(_,artist,_) => artist.toInt
}).collect.toSet

artistById.filter({
    case (id, name) => existingProducts.contains(id)
}).values.collect().foreach(println)

위 코드의 수행 결과는 다음과 같은 결과를 보이는데,

David Gray
Blackalicious
Jurassic 5
The Saw Doctors
Xzibit

이 결과는 2093760 사용자가 플레이한 아티스트의 목록이다. 플레이했던 아티스트로 보아, 주로 pop과 hip-hop 음악을 플레이했음을 알 수 있다. (물론 나를 포함한 이 글을 읽는 사람들은 한국인이기 때문에 잘 모를 것이다... 책에서 그렇다고 하니 일단 믿어 보자.) 이러한 정보를 갖고 있는 사용자에게는 어떤 아이템들을 추천 해 주었는가는 다음 코드를 이용해 확인할 수 있다.

val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)

위 결과는 다음과 같이 상위 5개의 아이템을 추천해 준 결과를 출력한다.

Rating(2093760,1300642,0.027983077231064094)
Rating(2093760,2814,0.027609241365462805)
Rating(2093760,1001819,0.027584770801984716)
Rating(2093760,1037970,0.027400202899883735)
Rating(2093760,829,0.027248976510692982)

추천의 수행 결과는 앞서 생성하였던 Rating 객체를 이용하여 표현된다. Rating 객체에는 (사용자 ID, 아티스트 ID, 값) 형태의 데이터가 존재한다. 이름은 Rating 이지만 세번째 필드의 값이 그대로 평점 값을 나타내는 것은 아님을 주의해야한다. ALS 알고리즘에서는 이 값은 0 과 1 사이의 값을 가지며 값이 높을 수록 좋은 추천을 이야기한다.

다음 코드를 이용해 추천된 결과에서 각각의 아티스트 ID 가 어떤 아티스트인지 이름을 확인해 볼 수 있다.

val recommendedProductIDs = recommendations.map(_.product).toSet

artistById.filter({
    case (id, name) => recommendedProductIDs.contains(id)
}).values.collect().foreach(println)

수행 결과는 다음과 같다. 이 결과는 수행시마다 다를 수 있다.

50 Cent
Nas
Kanye West
2Pac
The Game

위 목록의 아티스트는 모두 hip-hop 관련 아티스트이다. 얼핏 보기엔 괜찮아 보이지만 너무 대중적인 가수들이며 사용자의 개인적인 성향을 파악하지는 못한 것 같은 결과를 보인다.



Next Post

지금까지는 사용자들의 음악 플레이 기록을 이용하여, 아티스트를 추천해주는 과정을 수행하였다. 다음 포스트에서는 수행한 추천이 얼마나 잘 수행되었는지 평가하는 과정을 진행할 것이다.



References

[1] : Advanced Analytics with Spark

Read More...
Spark & 머신 러닝 - Introduction to Spark

이 글에서는 Spark가 어떤 형태로 동작하는지 설명하고 간단한 예시를 통해 Spark에 적응하는 과정을 설명한다.

이 포스트는 Advanced Analytics with Spark을 정리한 글이다.

이 글에서 다루고자 하는 내용은 Chapter 2이다. Chapter 1은 빅데이터에 대한 개략적인 얘기와, 왜 Spark가 뜨고 있는지, Spark 가 데이터 분석에서 어떠한 역할을 하고 있는지에 대한 설명이 있었다. 그 내용들은 다른 자료들에 많이 있으므로 따로 정리는 하지 않았다.



Record Linkage

chapter 2에서는 Record Linkage와 비슷한 작업(ex : ETL)들을 Spark 로 어떻게 수행하는지에 대해 설명하고, Follow-up 할 수 있도록 하고있다.

실제 데이터를 이용해 분석을 수행할 때 대부분은 수집한 데이터를 그대로 이용하지 않는다. 수집한 데이터를 그대로 이용한다면 잘못된 데이터(값이 비어있거나, 필요없는 데이터가 섞여있거나, 같은 데이터가 중복되어 들어있거나 등등)들이 분석에 그대로 활용되기 때문에 이들을 잘 필터링 해야 한다.

이 책에서 Recoed Linkage는 위의 문제 중 같은 데이터가 다른 형태로 들어있을 때, 그것을 하나의 데이터로 간주하도록 하는 것이라 얘기하고 있다.

이러한 내용들을 수행하기 위해 Record Linkage Comparison Patterns Dataset을 이용한다.



The Spark Shell and SparkContext

Spark를 이용하는 방법은 크게 두 가지가 있다. 하나는 Spark Shell 을 통해서 REPL(read-eval-print loop) 형태로 Spark를 이용하는 방법이고, 나머지 방법은 Spark Application 을 IDE를 이용해 작성한 후, 그것을 패키징 하여 Spark Cluster로 Submit하여 수행하는 방법이다.

REPL을 이용하려면 다음과 같은 명령어를 이용해야한다.

(여기서, 개인적인 공부이기 때문에 로컬 클러스터에서 수행함을 가정한다. 또한, Shell은 Scala를 기반으로 작성해야 하기 때문에 Scala 에 대한 이해가 필요하다.)

$ spark-shell --master local[2]

만약 Spark 를 YARN을 이용해 수행시키고 싶다면 다음과 같이 입력한다.

$ spark-shell --master yarn-client


이 챕터에서는 http://bit.ly/1Aoywaq 링크의 데이터를 이용하고 있다. 이 데이터를 HDFS로 업로드해야 하기 때문에, 다음 과정을 이용해 데이터를 HDFS로 업로드 한다.

$ wget http://bit.ly/1Aoywaq -O donation.zip
$ unzip donation.zip
$ unzip 'block_*.zip'
$ hdfs dfs -mkdir /linkage
$ hdfs dfs -put block_*.csv /linkage

책에서는 (아직은) Spark-shell을 기준으로 설명하고 있다. 본 글 역시 다른 언급이 있지 않는 이상 Spark-shell 을 기준으로 설명할 것이다.

우선, 다음 명령어를 이용해 데이터가 HDFS에 정상적으로 업로드 되고, 그것을 잘 읽어 오는지 확인한다.

val rawblocks = sc.textFile("/linkage")
...
rawblocks: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

Spark Shell 에서는 기본적인 SparkContext 객체에 대한 인스턴스를 하나 제공한다. 그 인스턴스에 대한 접근은 sc로 할 수 있으며, textFile 함수를 이용해 HDFS에 저장되어 있는 파일을 읽어올 수 있다.

다음 명령어를 이용해 데이터의 상위 10 줄에 어떤 데이터가 들어있는지 확인한다.

val head = rawblocks.take(10)
head.foreach(println)

그러면 다음과 같은 값이 들어있음을 알 수 있다.

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE

이 결과를 보면, csv 데이터의 제일 첫 줄에 각 컬럼이 나타내는 값이 어떤 것인지에 대한 정보가 있다. 이것을 다음과 같은 함수와 명령어를 이용하여 필터링한다.

def isHeader(line:String) : Boolean = {
    line.contains("id_1")
}

val noheader = rawblocks.filter(x => !isHeader(x))

그 결과로, 다음과 같이 컬럼명이 제외된 데이터 10개를 볼 수 있다.

37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
46626,47940,1,?,1,?,1,1,1,1,1,TRUE



Structuring Data with Tuples and Case Classes

지금까지 읽은 데이터를 그대로 읽어서 분석에 활용할 수 있지만, 데이터를 파싱하여 활용하면 더욱 쉽게 활용할 수 있다.

데이터는 다음과 같은 형태를 띄고 있다.

  • 처음 2개의 Integer 값 : 레코드에서 매칭되는 환자의 ID
  • 9개의 Double 값 : 9가지의 필드에 대한 매칭 스코어(없을 수 있음)
  • Boolean 값 : 매치 되는지 여부에 대한 판별결과

이를 파싱하기 위해 다음과 같은 Case Class를 정의하고 필요한 서브 함수를 작성하여 활용한다.

case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean)

def toDouble(s: String) = {
    if("?".equals(s)) Double.NaN else s.toDouble
}

def parse(line: String) = {
    val pieces = line.split(',');
    val id1 = pieces.apply(0).toInt
    val id2 = pieces.apply(1).toInt
    val scores = pieces.slice(2, 11).map(toDouble)
    val matched = pieces.apply(11).toBoolean
    MatchData(id1, id2, scores, matched)
}

다음 명령어를 통해 데이터를 파싱하고, 그 결과를 확인한다.

val parsed = noheader.map(line => parse(line))

parsed.take(10).foreach(println)

그리고, 지금까지 파싱한 결과를 메모리에 cache()시켜놓는다.

parsed.cache()



Creating Histograms

Histogram 은 간단히 말하면, 항목별로 개수를 센 결과를 나타낸다고 할 수 있다. 이 절에서는 지금까지 파싱한 데이터가 matched 필드 값의 종류(true, false)별로 얼마나 존재하는지에 대한 히스토그램을 생성할 것이다. 다행히도 Spark의 RDD에서 기본적으로 제공하는 countByValue를 이용하면 쉽게 해결할 수 있다.

다음 명령어를 통해 각 값 별로 얼마만큼의 레코드가 존재하는지 쉽게 카운팅 할 수 있다.

val matchCounts = parsed.map(md => md.matched).countByValue()

다음과 같은 수행 결과로, 손쉽게 matched 필드의 각 값 별 히스토그램을 생성할 수 있다.

matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201)



Summary Statistics for Continuous Variables

앞서 설명된 countByValue는 값의 종류에 따라 Histogram 을 생성하는 좋은 방법 중 하나이다. 하지만 Boolean 형태의 값처럼 적은 범위를 갖는 값이 아니라 Continuous Variable, 즉 연속변수와 같은 경우에 사용하기에는 적절하지 않다.

연속변수들에 대해서는 모든 값의 Histogram 을 구하는 것보다 분포에 대한 확률적인 통계 수치(평균, 표준편차, 최대 or 최솟값 등)를 보는 것이 좀 더 간결하게 데이터를 파악할 수 있다.

Spark에서는 이를 위해 stats라는 함수를 제공한다. 이 함수를 이용함으로써 손쉽게 특정 변수에 대한 통계적 수치를 출력할 수 있다. 파싱한 값중에 NaN이 들어가 있을 수 있기 때문에, 해당 레코드는 필터링을 수행한 후에 수행시키도록 한다.

import java.lang.Double.isNaN
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()

위 코드의 수행 결과로 다음과 같은 결과를 볼 수 있다.

org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)

또한 Scala의 Range를 이용하여 scores 배열에 들어있는 모든 변수에 대한 수치값들에 대한 통계치를 구할 수 있다.

val stats = (0 until 9).map(i => {
    parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
stats.foreach(println)

그 결과는 다음과 같다.

(count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)
(count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000)
(count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000)
(count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000)
(count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000)
(count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000)
(count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000)
(count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000)
(count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000)



Creating Reusable Code for Computing Summary Statistics

하지만, 위와 같은 작업은 매우 비효율적이다. scores 배열에 존재하는 값에 대한 각각의 통계치를 계산하기 위해서 parsed RDD를 매번 다시 계산해야 한다. 물론 앞선 과정에서 parsed RDDcache 해 놓긴 하였지만, 데이터가 많아지면 많아질 수록 이 작업의 소요시간은 급속도로 증가할 것이다.

이러한 경우에, 어떠한 RDD[Array[Double]]를 인자로 받아, 값이 정상적으로 들어있는 레코드들에 대한 각 인덱스별 StatCounter 를 갖는 클래스 혹은 함수를 작성하는 것을 생각해 볼 수 있다.

또한, 이러한 작업이 분석 과정에서 반복될 때, 매번 해당 코드를 새롭게 작성하는 것보다 다른 파일에 작성하여 그것을 재사용하는 것이 적절한 방식이다. 때문에 다른 파일에 스칼라 코드를 작성하고, Spark에서 그 파일을 불러와 사용하도록 할 것이다. 다음 소스코드를 다른 파일 StatsWithMissing.scala파일에 저장한 후 사용할 것이며, 멤버변수와 함수에 대해서는 코드의 뒷 부분에서 설명할 것이다.

import org.apache.spark.util.StatCounter

class NAStatCounter extends Serializable{
    val stats: StatCounter = new StatCounter()
    var missing: Long = 0

    def add(x: Double): NAStatCounter = {
        if(java.lang.Double.isNaN(x)) {
            missing += 1
        } else {
            stats.merge(x)
        }
        this
    }

    def merge(other: NAStatCounter): NAStatCounter = {
        stats.merge(other.stats)
        missing += other.missing
        this
    }

    override def toString = {
        "stats: " + stats.toString + " NaN: " + missing
    }
}

object NAStatCounter extends Serializable {
    def apply(x: Double) = new NAStatCounter().add(x)
}

앞서 정의한 NAStatCounter 클래스는 두 개의 멤버 변수를 갖고 있다. stats로 정의된 StatCounter 인스턴스는 immutable이고, missing으로 정의된 Long 변수는 mutable 변수이다. 이 클래스를 Serializable 객체를 상속시킨 이유는, Spark 의 RDD에서 이 객체를 사용하기 위해선 반드시 상속을 시켜주어야 한다. 만약 이 상속을 하지 않으면 RDD에서 에러가 발생한다.

클래스의 첫 번째 함수 add는 새로운 Double 형태의 값을 받아 stats변수가 값을 계속 관측할 수 있도록 한다. 만약 인자로 받은 값이 NaN이면, missing 값을 1 증가시키고, NaN이 아니라면 StatCounter 객체에 기록한다. 두 번째 함수 merge는 다른 NAStatCounter 인스턴스를 매개변수로 받아 지금의 인스턴스와 병합시키는 역할을 한다. 세 번째 함수 toString은 쉽게 NAStatCounter 클래스를 출력하기 위해서 오버라이딩 한 것이다. 스칼라에서는 부모 객체의 함수를 오버라이딩 하기 위해선 반드시 함수 앞에 override 키워드를 추가해야 한다.

그리고 class 정의와 함께 NAStatCounter 객체에 대한 companion object를 함께 정의한다. 스칼라에서 object 키워드는 자바에서의 static method 와 같이 어떤 클래스에 대한 helper method를 제공하는 싱글톤 객체를 선언하는데에 이용된다. 이 경우에서처럼 class 이름과 같은 object를 선언하는 것을 companion object를 선언한다고 하며, 여기서의 apply 함수는 NAStatCounter 클래스에 대한 새 인스턴스를 생성하고, 그 인스턴스를 반환하기 전에 Double 값을 더한다.

정상적으로 로드가 되었다면 다음과 같은 메시지가 출력된다.

import org.apache.spark.util.StatCounter
defined class NAStatCounter
defined module NAStatCounter
warning: previously defined class NAStatCounter is not a companion to object NAStatCounter.
Companions must be defined together; you may wish to use :paste mode for this.

경고가 출력되어 문제가 생긴것이 아닌가 할 수 있지만, 무시할 수 있는 경고이다. 다음과 같은 예시로 정상적으로 로드되었는지 확인해 볼 수 있다.

val nas1 = NAStatCounter(10.0)
nas1.add(2.1)
val nas2 = NAStatCounter(Double.NaN)
nas1.merge(nas2)

이제 작성한 NAStatCounter클래스를 이용하여 parsed RDD에 들어있는 MatchData 레코드를 처리하자. 각각의 MatchData 인스턴스는 Array[Double] 형태의 매칭 스코어를 포함하고 있다. 배열에 있는 각각의 엔트리마다 NAStatCounter 객체를 생성시켜, 모든 값을 추적하려 한다. 그렇게 하기 위해선 다음과 같은 방식으로 RDD 안에 존재하는 모든 레코드는 Array[Double]을 갖고 있기 때문이 이것을 Array[NAStatCounter]를 갖는 RDD로 변경하면 된다.

val nasRDD = parsed.map(md => {
    md.scores.map(d => NAStatCounter(d))
})

이제, 여러개의 Array[NAStatCounter] 를 하나의 배열로 합치면서 각 인덱스별로 존재하는 NAStatCounter를 병합하면 된다. 이를 위해 다음과 같은 코드를 이용한다.

val reduced = nasRDD.reduce((n1, n2) => {
    n1.zip(n2).map { case (a, b) => a.merge(b) }
})
reduced.foreach(println)

그 결과로 다음과 같이 모든 매칭 스코어에 대한 인덱스별 통계 수치를 구할 수 있다.

stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007
stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434
stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0
stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668
stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0
stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795
stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795
stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000) NaN: 795
stats: (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000) NaN: 12843

또한 다음과 같이 statsWithMissing함수를 정의하고 이를 사용함으로써 좀 더 고급지게(?) 처리할 수 있다.

def statsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = {
    val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {
        val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d))
        iter.foreach(arr => {
            nas.zip(arr).foreach { case (n, d) => n.add(d)}
        })
        Iterator(nas)
    })
    nastats.reduce((n1, n2) => {
        n1.zip(n2).map { case (a, b) => a.merge(b)}
    })
}



Simple Variable Selection and Scoring

앞서 작성한 statsWithMissing 함수를 이용해 다음과 같이 parsedRDD 로부터 각 데이터가 매치되는지 여부에 따라 scores의 분포가 어떻게 되는지 구할 수 있다.

val statsm = statsWithMissing(parsed.filter(_.matched).map(_.scores))
val statsn = statsWithMissing(parsed.filter(!_.matched).map(_.scores))

두 변수 statsmstatsn은 전체 데이터를 매치되는지 여부에 따라 두 개의 서브셋으로 나누어 scores의 확률적 분포에 대한 정보를 갖고 있다. 여기서 두 개의 각 서브셋에 존재하는 scores 배열의 각 칼럼 별 정보를 비교함으로써, 두 서브셋 차이에 각 feature 별로 어떤 차이를 갖고 있는지를 비교할 수 있다.

statsm.zip(statsn).map { case(m, n) =>
    (m.missing + n.missing, m.stats.mean - n.stats.mean)
}.zipWithIndex.foreach(println)

위 코드의 수행 결과로 다음과 같은 결과가 나온다.

((1007,0.285452905746686),0)
((5645434,0.09104268062279874),1)
((0,0.6838772482597568),2)
((5746668,0.8064147192926266),3)
((0,0.03240818525033473),4)
((795,0.7754423117834044),5)
((795,0.5109496938298719),6)
((795,0.7762059675300523),7)
((12843,0.9563812499852178),8)

좋은 feature 는 두 가지의 속성이 있다. 하나는 그 feature에 따른 크기가 큰 차이가 존재하는 것이고, 모든 데이터 쌍(두 서브셋 사이의)에 대해서도 균일하게 발생한다는 것이다. 이러한 이론에 따라 인덱스 1의 feature 는 좋은 feature라고 할 수 없다. 값이 존재하지 않아 missing이 카운트 된 횟수가 매우 많으며, 두 서브셋의 평균값의 차이가 0.09로 매우 적다(값의 범위가 0 에서 1인 것임을 감안했을 때). Feature 4 또한 적절치 않다. Feature 4는 모든 값이 존재하지만 평균 값의 차이가 0.03이므로 두 데이터 그룹 사이에 별 차이가 없기 때문이다.

반면, feature 5와 7은 훌륭한 feature 이다. 대부분의 데이터에 대해 값이 존재하며, 두 그룹의 평균 차이가 매우 크기 때문이다. 그리고 feature 2, 6, 8 역시 괜찮은 feature 라 할 수 있다. Feature 0과 3은 좀 애매하다고 할 수 있다. Feature 0은 대부분의 데이터에서 관측 가능하지만 두 셋의 평균 차이가 크지 않고, 반대로 Feature 3은 두 셋의 평균 차이가 크지만 많은 데이터에서 관측하기가 어렵다. 이 정보들은 두 데이터 셋을 명확하게 표현하기가 어렵다.

앞서 설명한 내용을 바탕으로하여 쓸만한 feature(2, 5, 6, 7, 8)를 이용해 각 데이터에 대한 scoring model을 만들 것이다. 이 모델에서는 NaN 값은 0으로 처리하여 계산할 것이다.

def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d
case class Scored(md: MatchData, score: Double)
val ct = parsed.map(md => {
    val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sum
    Scored(md, score)
})
ct.map(s => s.md.matched).countByValue()

...

scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201)

생성한 ct RDD 에 여러 Threshold 값들을 지정하고, 매치되었는지 여부에 따라 카운팅을 함으로써 데이터의 속성에 대해 파악할 수 있다.

다음 결과는 Threshold 를 4.0 으로 정하였는데, 이것은 각각의 feature 에 대해 평균적으로 0.8 이상을 갖고 있음을 의미한다.

ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue()

...

scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637)

위와 같은 결과는 matched에 속하는 데이터 중 true를 갖는 데이터들의 99%이상이 feature(2, 5, 6, 7, 8) 합이 4 이상임을 나타내고, false를 갖는 데이터는 대부분(98.8%)이 4.0 이하의 값을 갖는다는것을 얘기한다.

다음과 같이 score 값을 2.0 으로 필터링 하면 다음과 같은 결과를 얻을 수 있다.

ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue()

...

scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414)

이 결과는 matched 값이 true 인 경우에는 모든 데이터에가 feature 의 합이 2.0 이상이라는 것과, matched 값이 false인 경우에는 여전이 90% 이상이 feature 합이 2.0 미만이라는 것을 알 수 있다.

앞의 예시에서는 매우 간단한 특정 feature들의 합으로 matched 값에 따라 분류된 두 데이터 셋의 특성에 대해 알아봤지만, 이를 다양하게 변화시킴으로써 주어진 데이터셋에 대해 또 다른 새로운 정보들을 얻을 수 있을 것이다.

지금까지 Spark를 사용해보고, 이를 이용해 간단한 데이터셋을 필터링하고, 데이터셋의 특성에 대해 파악해보았다. 다음 장부터는 또 다른 데이터 셋을 이용해 좀 더 깊이있는 분석을 해 볼 것이다.

Read More...
Spark & 머신 러닝 - Overview

이 글들은 Advanced Analytics with Spark를 공부하면서 정리한 글이다.

Advanced Analytics with Spark는 2015년 4월에 발매된 책으로, Spark 를 이용해서 데이터 분석을 수행하는 방법과 예시에 대해 작성되어 있다.

Spark에 대한 기초적인 내용부터 시작하여 여러 오픈되어 있는 데이터셋을 이용해 데이터 분석을 수행하는 과정이 담겨있다. Spark에서 제공하는 mllib과 graphX 등을 사용하여 분석을 수행한다.

책에서는 Spark 버전은 1.3을 기준으로 하고 있지만, ~~최근 1.4가 공개되었으므로 1.4로 진행할 것이다.~~ 1.5로 진행할 것이다.

진행하는데에 앞서 필요한 사항과 개발 환경은 다음과 같다. 아직 책의 초반을 진행하고 있어서 IDE를 사용할 일이 적은데, 사용하게 된다면 IntelliJ IDEA를 사용할 것이다.


Pre-requirements

  • Hadoop 2.6+
  • Spark 1.4
  • Spark와 Hadoop에 대한 기본적인 지식

Environment

  • Scala 2.10
  • Intellij IDEA 14 (사용하게 되면)

Target

  • 제대로 된 Spark의 사용법
  • 데이터 분석 경험



아직까지 내가 겪어본 Spark는 단순하게 예제를 돌려보거나, 혼자의 감에 의지한 주먹구구식의 분석 뿐이었다. 이 책을 따라 공부하면서 제대로 Spark를 사용해 보고 데이터 분석을 경험해 볼 것이다. 그리고 그 경험을 이용해 내가 진행하였던 몇몇 분석 결과와 코드 등을 다시 돌아보고, 수정하여 말끔하게 다듬을 것이다.

또한 진행 도중에 괜찮은 아이디어가 떠오르면 그것을 직접 구현하고 결과를 공개 할 계획이다.

Read More...