【Spark:0】Localで、試す。
はじめに
次は、Spark をインストールしてみる。
まずは、ローカルで動かす。
インストール
- Spark は、ダウンロードして、展開するだけ。
インストールログ
[root@uldata14 ~]# curl -L -O http://spark-project.org/download/spark-0.8.0-incubating-bin-cdh4.tgz % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 134M 100 134M 0 0 861k 0 0:02:39 0:02:39 --:--:-- 955k [root@uldata14 ~]# tar -zxvf spark-0.8.0-incubating-bin-cdh4.tgz
テスト
以下の資料の、最初の例をテストしてみる。
Quick Start - Spark 1.5.2 Documentation
[root@uldata14 ~]# cd spark-0.8.0-incubating-bin-cdh4 [root@uldata14 spark-0.8.0-incubating-bin-cdh4]# ./spark-shell Welcome to . . . Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.6.0_24) Initializing interpreter... . . . . Spark context available as sc. Type in expressions to have them evaluated. Type :help for more information. scala>
shell が立ち上がった。
この時点で、4040ポートで、WebUIが見れる。
ほんでは、チュートリアルのまんま。
scala> val textFile = sc.textFile("README.md") xx/xx/xx xx:xx:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable xx/xx/xx xx:xx:18 INFO storage.MemoryStore: ensureFreeSpace(122019) called with curMem=0, maxMem=339585269 xx/xx/xx xx:xx:18 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 119.2 KB, free 323.7 MB) textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12 scala> textFile.count() xx/xx/xx xx:xx:27 INFO mapred.FileInputFormat: Total input paths to process : 1 xx/xx/xx xx:xx:27 INFO spark.SparkContext: Starting job: count at <console>:15 xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Got job 0 (count at <console>:15) with 1 output partitions (allowLocal=false) xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at <console>:15) xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Parents of final stage: List() xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Missing parents: List() xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at <console>:12), which has no missing parents xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[1] at textFile at <console>:12) xx/xx/xx xx:xx:27 INFO local.LocalTaskSetManager: Size of task 0 is 1586 bytes xx/xx/xx xx:xx:27 INFO local.LocalScheduler: Running 0 xx/xx/xx xx:xx:27 INFO rdd.HadoopRDD: Input split: file:/root/spark-0.8.0-incubating-bin-cdh4/README.md:0+4215 xx/xx/xx xx:xx:27 INFO local.LocalScheduler: Finished 0 xx/xx/xx xx:xx:27 INFO local.LocalScheduler: Remove TaskSet 0.0 from pool xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0) xx/xx/xx xx:xx:27 INFO scheduler.DAGScheduler: Stage 0 (count at <console>:15) finished in 0.077 s xx/xx/xx xx:xx:27 INFO spark.SparkContext: Job finished: count at <console>:15, took 0.198733219 s res0: Long = 111 scala> textFile.first() xx/xx/xx xx:xx:52 INFO spark.SparkContext: Starting job: first at <console>:15 xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:15) with 1 output partitions (allowLocal=true) xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Final stage: Stage 1 (first at <console>:15) xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Parents of final stage: List() xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Missing parents: List() xx/xx/xx xx:xx:52 INFO scheduler.DAGScheduler: Computing the requested partition locally xx/xx/xx xx:xx:52 INFO rdd.HadoopRDD: Input split: file:/root/spark-0.8.0-incubating-bin-cdh4/README.md:0+4215 xx/xx/xx xx:xx:52 INFO spark.SparkContext: Job finished: first at <console>:15, took 0.006843296 s res1: String = # Apache Spark
こんな感じ。。
でも、この
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
が気になるので、HDFSにアクセスしてみる。
元データに、wikiを使ってみる。
curl -O http://dumps.wikimedia.org/jawiki/latest/jawiki-latest-pages-articles.xml.bz2
bunzip2 jawiki-latest-pages-articles.xml.bz2
7685681088バイト。
sudo -u hdfs hadoop fs -put /tmp/jawiki-latest-pages-articles.xml /jawiki-latest-pages-articles.xml
で、格納しておく。
scala> val textFile = sc.textFile("hdfs://10.29.254.14:8020/jawiki-latest-pages-articles.xml") . . textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12 scala> textFile.count() . . . res0: Long = 117807835 scala> textFile.first() . . res1: String = <mediawiki xmlns="http://www.mediawiki.org/xml/export-0.8/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.8/ http://www.mediawiki.org/xml/export-0.8.xsd" version="0.8" xml:lang="ja">
一応、読めてるようだ。
Count中の、WebUIは、こんな感じ。
テスト追記
cacheの機能を試していなかったので、追加でテスト。
- README.mdを使う
- cache指定無しで、count
- cache指定
- 続けて、1回目 count
- 続けて、2回目 count
scala> val textFile = sc.textFile("README.md") scala> textFile.count() . . res0: Long = 111 scala> textFile.cache() res1: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12 scala> textFile.count() . . xxxx INFO spark.CacheManager: Cache key is rdd_1_0 xxxx INFO spark.CacheManager: Computing partition org.apache.spark.rdd.HadoopPartition@691 xxxx INFO rdd.HadoopRDD: Input split: file:/root/spark-0.8.0-incubating-bin-cdh4/README.md:0+4215 xxxx INFO storage.MemoryStore: ensureFreeSpace(14288) called with curMem=122019, maxMem=339585269 xxxx INFO storage.MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 14.0 KB, free 323.7 MB) xxxx INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_0 in memory on uldata14.hadoop.local:34076 (size: 14.0 KB, free: 323.8 MB) xxxx INFO storage.BlockManagerMaster: Updated info of block rdd_1_0 . res2: Long = 111 scala> textFile.count() . . xxxx INFO spark.CacheManager: Cache key is rdd_1_0 xxxx INFO spark.CacheManager: Found partition in cache! . res3: Long = 111
結果
- Cacheなし
- CPU time: 46 ms
- Cache あり
- 1回目 CPU time: 19 ms
- 2回目 CPU time: 2 ms