【Spark:0】Localで、試す。

はじめに

次は、Spark をインストールしてみる。
まずは、ローカルで動かす。

環境

物理サーバ 3台 (x86_64)

Centos 6.4 (6.2から、yum updateで、バージョンアップ)

Open JDK 6

CDH4.2.1

インストール

  • Spark は、ダウンロードして、展開するだけ。

インストールログ

[root@uldata14 ~]# curl -L -O http://spark-project.org/download/spark-0.8.0-incubating-bin-cdh4.tgz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  134M  100  134M    0     0   861k      0  0:02:39  0:02:39 --:--:--  955k

[root@uldata14 ~]# tar -zxvf spark-0.8.0-incubating-bin-cdh4.tgz 

テスト

以下の資料の、最初の例をテストしてみる。
Quick Start - Spark 1.5.2 Documentation

[root@uldata14 ~]# cd spark-0.8.0-incubating-bin-cdh4
[root@uldata14 spark-0.8.0-incubating-bin-cdh4]# ./spark-shell
Welcome to

.
.
.

Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.6.0_24)
Initializing interpreter...
.
.
.
.
Spark context available as sc.
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

shell が立ち上がった。
この時点で、4040ポートで、WebUIが見れる。


ほんでは、チュートリアルのまんま。

scala> val textFile = sc.textFile("README.md")
xx/xx/xx xx:xx:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
xx/xx/xx xx:xx:18 INFO storage.MemoryStore: ensureFreeSpace(122019) called with curMem=0, maxMem=339585269
xx/xx/xx xx:xx:18 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 119.2 KB, free 323.7 MB)
textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

scala> textFile.count()
xx/xx/xx xx:xx:27 INFO mapred.FileInputFormat: Total input paths to process : 1
xx/xx/xx xx:xx:27 INFO spark.SparkContext: Starting job: count at <console>:15
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Got job 0 (count at <console>:15) with 1 output partitions (allowLocal=false)
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at <console>:15)
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Parents of final stage: List()
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Missing parents: List()
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at <console>:12), which has no missing parents
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[1] at textFile at <console>:12)
xx/xx/xx xx:xx:27 INFO local.LocalTaskSetManager: Size of task 0 is 1586 bytes
xx/xx/xx xx:xx:27 INFO local.LocalScheduler: Running 0
xx/xx/xx xx:xx:27 INFO rdd.HadoopRDD: Input split: file:/root/spark-0.8.0-incubating-bin-cdh4/README.md:0+4215
xx/xx/xx xx:xx:27 INFO local.LocalScheduler: Finished 0
xx/xx/xx xx:xx:27 INFO local.LocalScheduler: Remove TaskSet 0.0 from pool 
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Stage 0 (count at <console>:15) finished in 0.077 s
xx/xx/xx xx:xx:27 INFO spark.SparkContext: Job finished: count at <console>:15, took 0.198733219 s
res0: Long = 111

scala> textFile.first()
xx/xx/xx xx:xx:52 INFO spark.SparkContext: Starting job: first at <console>:15
xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:15) with 1 output partitions (allowLocal=true)
xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Final stage: Stage 1 (first at <console>:15)
xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Parents of final stage: List()
xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Missing parents: List()
xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Computing the requested partition locally
xx/xx/xx xx:xx:52 INFO rdd.HadoopRDD: Input split: file:/root/spark-0.8.0-incubating-bin-cdh4/README.md:0+4215
xx/xx/xx xx:xx:52 INFO spark.SparkContext: Job finished: first at <console>:15, took 0.006843296 s
res1: String = # Apache Spark

こんな感じ。。

でも、この

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

が気になるので、HDFSにアクセスしてみる。

元データに、wikiを使ってみる。

curl -O http://dumps.wikimedia.org/jawiki/latest/jawiki-latest-pages-articles.xml.bz2
bunzip2 jawiki-latest-pages-articles.xml.bz2

7685681088バイト。

sudo -u hdfs hadoop fs -put /tmp/jawiki-latest-pages-articles.xml /jawiki-latest-pages-articles.xml
で、格納しておく。

scala> val textFile = sc.textFile("hdfs://10.29.254.14:8020/jawiki-latest-pages-articles.xml")
.
.
textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

scala> textFile.count()
.
.
.

res0: Long = 117807835

scala> textFile.first()
.
.
res1: String = <mediawiki xmlns="http://www.mediawiki.org/xml/export-0.8/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.8/ http://www.mediawiki.org/xml/export-0.8.xsd" version="0.8" xml:lang="ja">

一応、読めてるようだ。

Count中の、WebUIは、こんな感じ。



テスト追記

cacheの機能を試していなかったので、追加でテスト。

  • README.mdを使う
  • cache指定無しで、count
  • cache指定
  • 続けて、1回目 count
  • 続けて、2回目 count
scala> val textFile = sc.textFile("README.md")


scala> textFile.count()
.
.
res0: Long = 111

scala> textFile.cache()
res1: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

scala> textFile.count()
.
.
xxxx INFO spark.CacheManager: Cache key is rdd_1_0
xxxx INFO spark.CacheManager: Computing partition org.apache.spark.rdd.HadoopPartition@691
xxxx INFO rdd.HadoopRDD: Input split: file:/root/spark-0.8.0-incubating-bin-cdh4/README.md:0+4215
xxxx INFO storage.MemoryStore: ensureFreeSpace(14288) called with curMem=122019, maxMem=339585269
xxxx INFO storage.MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 14.0 KB, free 323.7 MB)
xxxx INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_0 in memory on uldata14.hadoop.local:34076 (size: 14.0 KB, free: 323.8 MB)
xxxx INFO storage.BlockManagerMaster: Updated info of block rdd_1_0
.
res2: Long = 111

scala> textFile.count()
.
.
xxxx INFO spark.CacheManager: Cache key is rdd_1_0
xxxx INFO spark.CacheManager: Found partition in cache!
.
res3: Long = 111

結果

  • Cacheなし
    • CPU time: 46 ms
  • Cache あり
    • 1回目 CPU time: 19 ms
    • 2回目 CPU time: 2 ms