이 글은 Spark를 기반으로 하여 추천 알고리즘을 수행하는 과정에 대한 것입니다.
추천에 사용되는 데이터셋은 MovieLens를 기준으로 하였습니다.
WARNING
이 글은 최신버전을 기준으로 설명된 글이 아닙니다. 최신 버전을 대상으로 하였을 때와 설치 과정 혹은 출력 결과가 다를 수 있습니다.
Environment
Dependency
Spark의 sub-project인 MLlib 프로젝트에서 이미 추천 알고리즘에 대한 라이브러리를 구현해 놓았습니다. 이를 사용하기 위해서 pom.xml에 다음과 같은 dependency를 추가합니다.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.1.0</version>
</dependency>
Recommendation Module
다음과 같은 형태로 추천 알고리즘을 사용하고 그 결과를 저장합니다.
JDK 1.7을 사용하였기 때문에 람다표현을 사용하지 않았습니다. Java 8을 사용하면 람다표현을 사용할 수 있으며, 좀 더 간략하게 Spark 프로그램을 작성할 수 있습니다.
SparkConf conf = new SparkConf().setAppName("Spark-recommendation").setMaster("yarn-cluster");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(INPUT_PATH);
JavaRDD<Rating> ratings = data.map(
new Function<String, Rating>() {
public Rating call(String s) {
String[] sarray = s.split(delimiter);
return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
Double.parseDouble(sarray[2]));
}
}
);
// Build the recommendation model using ALS
int rank = 10;
int numIterations = 20;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
new Function<Rating, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(Rating r) {
return new Tuple2<Object, Object>(r.user(), r.product());
}
}
);
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r) {
return new Tuple2<Tuple2<Integer, Integer>, Double>(
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
}
}
));
//<<Integer,Integer>,Double> to <Integer,<Integer,Double>>
JavaPairRDD<Integer, Tuple2<Integer, Double>> userPredictions = JavaPairRDD.fromJavaRDD(predictions.map(
new Function<Tuple2<Tuple2<Integer, Integer>, Double>, Tuple2<Integer, Tuple2<Integer, Double>>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Double>> call(Tuple2<Tuple2<Integer, Integer>, Double> v1) throws Exception {
return new Tuple2<Integer, Tuple2<Integer, Double>>(v1._1()._1(), new Tuple2<Integer, Double>(v1._1()._2(), v1._2()));
}
}
));
//Sort by key & Save
userPredictions.sortByKey(true).saveAsTextFile(OUTPUT_PATH);
context.stop();
전체 소스코드는 github repository에 있습니다.
Preparation
Input file은 Spark repository의 ALS 샘플 데이터를 이용하였습니다.
해당 파일을 HDFS에 업로드 한 후, Input File 로 사용합니다.
위 코드에서는 INPUT_PATH로 정의되었지만, github repository 에서는 Apache Commons-cli 를 이용하여 입력받았습니다.
Run Spark Application
maven을 이용하여 프로젝트를 패키징 합니다.
$ mvn package
다음과 같은 명령어를 이용하여 작성한 Wordcount application을 YARN에 submit 합니다.
$ spark-submit --class 패키지명.클래스명 --master yarn-cluster Package된Jar파일.jar
(github repository에는 Apache Commons-cli 를 이용하여 실제 수행 command는 뒤에 옵션이 추가로 붙습니다.)
Result
수행 결과로 다음과 같은 결과가 출력됩니다.
(0,(34,0.9846535656842613))
(0,(96,0.8178838683876802))
...
(1,(96,1.2547672185210839))
(1,(4,1.941481009392396))
...
(29,(86,1.0588376599353693))
(29,(68,3.3195965377284837))
위 결과는 각각의 사용자 0 ~ 29에 대해 영화별 평점을 예측한 수치입니다.