HBase Pseudo-distributed Mode 설치법

이 문서는 OS X 에서 HBase를 설치하고, Hadoop2 와 연동하는 방법에 대한 글입니다.
Pseudo Distributed Mode로 설치하고 실행하는 과정입니다.
Java Client API 를 이용하여 간단한 테스트까지 수행합니다.

WARNING

이 글은 최신버전을 기준으로 설명된 글이 아닙니다. 최신 버전을 대상으로 하였을 때와 설치 과정 혹은 출력 결과가 다를 수 있습니다.


Install Environments

설치 과정에서 사용된 여러 환경에 관한 내용입니다.

  • Hadoop 2.5.1
  • Maven 3
  • IntelliJ IDEA

Procedure

설치 과정에 대한 요약입니다. 5번과 6번은 다음 포스트에서 이어서 다루도록 하겠습니다.

  1. hosts 수정
  2. Zookeeper 설치 & 설정
  3. HBase 설치 & 설정
  4. HBase 실행
  5. 간단한 쿼리를 이용한 설치 테스트
  6. Client API 사용한 테스트

1. /etc/hosts 수정

Hbase에서 peudo-distributed 모드를 정상적으로 사용하기 위해서는 /etc/hosts를 수정해주어야 합니다.

127.0.0.1   localhost

부분을 삭제 혹은 주석처리 후, 다음 예시와 같이 실제 IP를 localhost 로 지정해주어야 합니다.

a.b.c.d     localhost

2. Zookeeper 설치 & 설정

다음 페이지에서 Zookeeper를 다운로드합니다.
저는 Stable 버전인 3.4.6 버전을 다운로드 하였습니다.
압축 해제한 경로를 ${ZK_HOME}으로 정의합니다. ${ZK_HOME}/conf폴더로 이동하여 다음 명령어로 설정 파일을 생성합니다.

$ cp zoo_sample.cfg zoo.cfg

새로 생성한 파일 zoo.cfg에서 dataDir변수를 Zookeeper 데이터를 저장 할 경로를 지정해줍니다. 다른 설정값은 변경하지 않아도 무방합니다.
다음 명령어를 이용하여 정상적으로 Zookeeper Server가 수행되는지 확인합니다.

$ ${ZK_HOME}/bin/zkServer.sh start
$ ${ZK_HOME}/bin/zkCli.sh -server IP_Address:2181

quit명렁어로 클라이언트를 종료한 후, 다음 명령어로 다시 Zookeeper 서버를 종료합니다.

$ ${ZK_HOME}/bin/zkServer.sh stop

3. HBase 설치 & 설정

이 과정부터는 Hadoop 2.x 가 켜져 있는 상태임을 가정합니다. Hadoop이 켜져 있지 않은 상황이라면 Hadoop을 켜 주시기 바랍니다.
다음 페이지에서 HBase를 다운로드합니다.
다운로드 할 때는, Hadoop의 버전과 맞는 접미사를 다운로드해야 합니다. 이 문서에서는 Hadoop 2.5.1을 기반으로 하기 때문에 hbase-0.98.7-hadoop2-bin.tar.gz를 다운로드 하였습니다.
다운로드한 압축파일을 해제하고, 그 경로를 ${HBASE_HOME}으로 정의합니다.
HBase에서 수정해야 할 설정파일은 hbase-site.xmlhbase-env.sh 입니다.

hbase-site.xml

${HBASE_HOME}/conf/hbase-site.xml파일을 다음과 같이 설정합니다.

<configuration>
        <property>
                <name>hbase.rootdir</name>
                <value>hdfs://IP_ADDRESS:9000/hbase</value>
        </property>
        <property>
                <name>hbase.zookeeper.quorum</name>
                <value>IP_ADDRESS</value>
        </property>
        <property>
                <name>hbase.zookeeper.property.dataDir</name>
                <value>Zookeeper의 DataDir</value>
        </property>
        <property>
                <name>hbase.cluster.distributed</name>
                <value>true</value>
        </property>
        <property>
                <name>hbase.master.info.port</name>
                <value>60010</value>
        </property>
        <property>
                <name>hbase.master.info.bindAddress</name>
                <value>IP_ADDRESS</value>
        </property>
        <property>
                <name>dfs.support.append</name>
                <value>true</value>
        </property>
        <property>
                <name>dfs.datanode.max.xcievers</name>
                <value>4096</value>
        </property>
        <property>
                <name>hbase.zookeeper.property.clientPort</name>
                <value>2181</value>
        </property>
        <property>
                <name>hbase.regionserver.info.bindAddress</name>
                <value>IP_ADDRESS</value>
        </property>
</configuration>
hbase-env.sh

${HBASE_HOME}/conf/hbase-env.sh파일의 몇몇 변수를 다음과 같이 설정합니다.(주석 해제 후 값을 변경해 주면 됩니다.)

  • JAVA_HOME

    시스템에 설치되어 있는 JDK의 경로를 지정해줍니다. JDK6 or JDK7을 추천합니다. 참고

  • HBASEMANAGESZK

    true로 설정합니다.

다음은 설정의 예시입니다.

export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HBASE_MANAGES_ZK=true

위 두 항목이 주석 해제되어 있고, 정상적인 값으로 설정되어있어야 합니다.

4. Hbase 실행

다음 명령어를 이용하여 HBase를 실행합니다.

$ ${HBASE_HOME}/bin/start-hbase.sh

다음 명령어를 이용하여 HDFS에 HBase 폴더가 정상적으로 생성되었는지 확인합니다.

$ ${HADOOP_HOME}/bin/hdfs dfs -ls /hbase

그리고 다음 명령어를 이용하여 Zookeeper에도 Hbase가 정상적으로 등록되었는지 확인합니다.
(${HADOOP_HOME}은 Hadoop이 설치되어 있는 경로입니다.)

${ZK_HOME}/bin/zkCli.sh -server IP_ADDRESS:2181
...
[zk:IP_ADDRESS:2181 (CONNECTED)] ls /

출력되는 항목에 hbase가 있어야 합니다.

Read More...
Spark RDD의 함수 동작 방식

이 글은 Spark의 RDD에 존재하는 함수들이 어떤 방식으로 동작되는지에 대한 글입니다.
Spark 의 공식 Documentation을 일부 번역하였습니다.
또한, Java 기준으로 설명 할 것이며, Scala 로 된 버전은 추후에 추가 작성할 계획입니다.
번역하기에 적절치 않은 용어들은 영문 단어 그대로 남겨놓았습니다.



WARNING

이 글은 최신버전을 기준으로 설명된 글이 아닙니다. 최신 버전을 대상으로 하였을 때와 설치 과정 혹은 출력 결과가 다를 수 있습니다.


Spark의 RDD

Spark는 Resilient Distributed Dataset, RDD 로 구성되어 있습니다. 이 RDD는 분산 형태로 처리 가능한 fault-tolerant collection 입니다. Spark에서 RDD를 생성하는 방법은 두 가지가 있습니다. 첫 번째로는 Driver 프로그램에서 이미 존재하는 colleciton을 parallelizing 시키는 방법이 있고, 다른 방법으로는 HDFS나 HBase혹은 Hadoop InputFormat 으로 수행 가능한 어떠한 데이터 소스를 referencing하는 방법이 있습니다.


Parallelized Collections

Java에서 Parallelized Collection을 생성하기 위해서는 JavaSparkContext클래스에 존재하는 parallelize 함수에 Driver 프로그램에서 사용한 Collection을 파라미터로 넘겨주면 됩니다. Collection에 존재하는 엘리먼트들은 Distributed Dataset으로 복사되고, 분산 형태로 연산됩니다. 1 에서 5 까지 값을 갖는 리스트를 parallelized collection으로 생성하는 방법은 다음 예와 같습니다.

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

한번 생성된 distributed dataset(distData)는 분산 형태로 연산될 수 있습니다. 예를들면, 다음과 같은 코드를 이용하여 리스트에 존재하는 모든 값을 더할 수 있습니다.

distData.reduce((a, b) -> a + b);

이러한 분산 형태의 연산에 대해서는 아래 Section 에서 설명할 것입니다.

이 Documentation에서는 Java 8 에서 제공하는 Lamda 문법을 사용하고 있습니다. Java 8을 사용할 수 없는 상황이어서 lambda 표현식을 사용하지 못할 때는, org.apache.spark.api.java.function 패키지를 구현하여 사용할 수 있습니다. 이에 대해서는 아래 Section에서 설명할 것입니다.

parallel collection에서 중요한 파라메터중 하나는 데이터셋을 몇 개의 slices로 나눌 것인지에 대한 것입니다. Spark는 각 cluster의 조각마다 하나의 태스크를 수행 하게 됩니다. 일반적으로 클러스터에서 각각의 CPU 마다 2 ~ 4 개의 slice를 합니다. Spark는 클러스터를 기반으로 slice의 수를 자동적으로 생성을 시도하지만, parallelize를 수행할 때 그 개수를 수동으로 지정해 줄 수 있습니다. (e.g. sc.parallelize(data, 10))


External Datasets

Spark에서는 Hadoop과 연관되는 모든 데이터 소스(Local File System, HDFS, Cassandra, HBase, Amazon S3, etc.)를 이용하여 distributed dataset을 생성할 수 있습니다. Spark는 텍스트파일, Sequence File과 모든 Hadoop InputFormat을 지원합니다.

텍스트 파일의 RDD는 SparkContexttextFile 함수를 사용하여 생성될 수 있으며, 이 함수는 파일의 URI를 이용하여 파일에 접근합니다. 그리고 텍스트 파일의 한 라인의 collection으로 읽고, 다음과 같은 형태로 파일을 불러옵니다.

JavaRDD<String> distFile = sc.textFile("data.txt");

한번 생성이 되면, distFile은 dataset 연산들을 수행할 수 있게 됩니다. 예를들면 mapreduce를 이용하여 모든 라인의 길이를 더한 갚을 구할때는 다음과 같이 명령어를 수행하면 됩니다.

distFile.map(s -> s.length()).reduce(a, b) -> a + b)

Spark를 이용하여 파일을 읽을 때, 다음 사항들을 참고할 수 있습니다.

  • Local File System 에 있는 파일에 접근할 때, 반드시 Worker node에서 접근 가능한 경로에 파일이 있어야 한다. 모든 Worker에게 파일을 복사하거나, network-mounted 로 공유된 파일 시스템을 사용해야 합니다.

  • Spark의 file-based 입력 함수는 폴더, 압축파일과 와일드카드 표현을 포함하여 사용될 수 있습니다. 예를 들면 textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*/gz")가 모두 가능합니다.

Spark는 텍스트 파일 이외에도 많은 데이터 포맷을 제공합니다.

  • JavaSaprkContext.wholeTextFiles는 하나의 폴더 안에 존재하는 텍스트 파일을 읽습니다. 그리고 그것을 <파일이름, 내용> 형태의 쌍으로 리턴합니다. 이것은 한 파일을 읽어 각각의 줄에 대해 처리하는 textFile과는 다른 형태입니다.

  • SequenceFiles를 읽기 위해서는 SparkContext의 sequenceFile[K,V] 함수를 사용해야합니다. K 와 V는 해당 파일에서 사용하고 있는 Key와 Value의 타입입니다. 이것들은 Hadoop의 IntWritable 과 Text 클래스와 같이 Writable클래스의 subclass여야 합니다. Spark에서는 이러한 과정을 돕기 위해 몇 개의 일반적인 Writable 타입에 대해 native type을 제공합니다. 예를들어 sequenceFile[Int,String]을 사용하면, 이것은 IntWritableText로 인식됩니다.

  • 다른 Hadoop InputFormat을 사용하기 위해서는 JavaSparkContext.HadoopRDD함수를 사용해야 합니다. 이 함수는 임의의 JobConf와 InputFormat 클래스, Key 클래스, Value 클래스를 받습니다. Hadoop 에서 입력 소스를 지정하는 과정과 같은 방식으로 지정을 해야 합니다. 또한 새 맵리듀스 API(org.apache.hadoop.mapreduce 패키지에 존재하는 API)를 사용하기 위해서는JavaSparkContext.newHadoopRDD를 사용해야합니다.

  • JavaRDD.saveAsObjectFileJavaSparkContext.objectFile은 Serialized된 자바 객체 형태로 출력하는 것을 지원합니다. 이것은 Avro와 같이 특수화 된 형태보다는 효율적이지 않지만 간단한 형태로 어떠한 RDD도 저장 가능합니다.


RDD Operations

추후 번역

Read More...
Spark 기반의 추천 알고리즘 수행

이 글은 Spark를 기반으로 하여 추천 알고리즘을 수행하는 과정에 대한 것입니다.
추천에 사용되는 데이터셋은 MovieLens를 기준으로 하였습니다.

WARNING

이 글은 최신버전을 기준으로 설명된 글이 아닙니다. 최신 버전을 대상으로 하였을 때와 설치 과정 혹은 출력 결과가 다를 수 있습니다.


Environment


Dependency

Spark의 sub-project인 MLlib 프로젝트에서 이미 추천 알고리즘에 대한 라이브러리를 구현해 놓았습니다. 이를 사용하기 위해서 pom.xml에 다음과 같은 dependency를 추가합니다.

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.10</artifactId>
    <version>1.1.0</version>
</dependency>


Recommendation Module

다음과 같은 형태로 추천 알고리즘을 사용하고 그 결과를 저장합니다.
JDK 1.7을 사용하였기 때문에 람다표현을 사용하지 않았습니다. Java 8을 사용하면 람다표현을 사용할 수 있으며, 좀 더 간략하게 Spark 프로그램을 작성할 수 있습니다.

SparkConf conf = new SparkConf().setAppName("Spark-recommendation").setMaster("yarn-cluster");
JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> data = context.textFile(INPUT_PATH);
JavaRDD<Rating> ratings = data.map(
        new Function<String, Rating>() {
            public Rating call(String s) {
                String[] sarray = s.split(delimiter);
                return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
                        Double.parseDouble(sarray[2]));
            }
        }
);

// Build the recommendation model using ALS
int rank = 10;
int numIterations = 20;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
        new Function<Rating, Tuple2<Object, Object>>() {
            public Tuple2<Object, Object> call(Rating r) {
                return new Tuple2<Object, Object>(r.user(), r.product());
            }
        }
);
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
        model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
                new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
                    public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r) {
                        return new Tuple2<Tuple2<Integer, Integer>, Double>(
                                new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
                    }
                }
        ));

//<<Integer,Integer>,Double> to <Integer,<Integer,Double>>
JavaPairRDD<Integer, Tuple2<Integer, Double>> userPredictions = JavaPairRDD.fromJavaRDD(predictions.map(
        new Function<Tuple2<Tuple2<Integer, Integer>, Double>, Tuple2<Integer, Tuple2<Integer, Double>>>() {
            @Override
            public Tuple2<Integer, Tuple2<Integer, Double>> call(Tuple2<Tuple2<Integer, Integer>, Double> v1) throws Exception {
                return new Tuple2<Integer, Tuple2<Integer, Double>>(v1._1()._1(), new Tuple2<Integer, Double>(v1._1()._2(), v1._2()));
            }
        }
));

//Sort by key & Save
userPredictions.sortByKey(true).saveAsTextFile(OUTPUT_PATH);
context.stop();

전체 소스코드는 github repository에 있습니다.


Preparation

Input file은 Spark repositoryALS 샘플 데이터를 이용하였습니다.
해당 파일을 HDFS에 업로드 한 후, Input File 로 사용합니다.
위 코드에서는 INPUT_PATH로 정의되었지만, github repository 에서는 Apache Commons-cli 를 이용하여 입력받았습니다.

Run Spark Application

maven을 이용하여 프로젝트를 패키징 합니다.

$ mvn package

다음과 같은 명령어를 이용하여 작성한 Wordcount application을 YARN에 submit 합니다.

$ spark-submit --class 패키지명.클래스명 --master yarn-cluster Package된Jar파일.jar

(github repository에는 Apache Commons-cli 를 이용하여 실제 수행 command는 뒤에 옵션이 추가로 붙습니다.)


Result

수행 결과로 다음과 같은 결과가 출력됩니다.

(0,(34,0.9846535656842613))
(0,(96,0.8178838683876802))
...
(1,(96,1.2547672185210839))
(1,(4,1.941481009392396))
...
(29,(86,1.0588376599353693))
(29,(68,3.3195965377284837))

위 결과는 각각의 사용자 0 ~ 29에 대해 영화별 평점을 예측한 수치입니다.

Read More...