Spark本地模式运行

2015.03.30 | Comments

Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。在本地运行模式中,master和worker都运行在一个jvm进程中,通过该模式,可以快速的测试Spark的功能。

下载 Spark

下载地址为http://spark.apache.org/downloads.html,根据页面提示选择一个合适的版本下载,这里我下载的是 spark-1.3.0-bin-cdh4.tgz。下载之后解压:

 cd ~
 wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.0/spark-1.3.0-bin-cdh4.tgz
 tar -xf spark-1.3.0-bin-cdh4.tgz
 cd spark-1.3.0-bin-cdh4

下载之后的目录为:

⇒  tree -L 1
.
├── CHANGES.txt
├── LICENSE
├── NOTICE
├── README.md
├── RELEASE
├── bin
├── conf
├── data
├── ec2
├── examples
├── lib
├── python
└── sbin

运行 spark-shell

本地模式运行spark-shell非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

$ MASTER=local 
$ bin/spark-shell

MASTER=local就是表明当前运行在单机模式。如果一切顺利,将看到下面的提示信息:

Created spark context..
Spark context available as sc.

这表明spark-shell中已经内置了Spark context的变量,名称为sc,我们可以直接使用该变量进行后续的操作。

spark-shell 后面设置 master 参数,可以支持更多的模式,请参考 http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

我们在sparkshell中运行一下最简单的例子,统计在README.md中含有Spark的行数有多少,在spark-shell中输入如下代码:

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

如果你觉得输出的日志太多,你可以从模板文件创建 conf/log4j.properties :

$ mv conf/log4j.properties.template conf/log4j.properties

然后修改日志输出级别为WARN

log4j.rootCategory=WARN, console

如果你设置的 log4j 日志等级为 INFO,则你可以看到这样的一行日志 INFO SparkUI: Started SparkUI at http://10.9.4.165:4040,意思是 Spark 启动了一个 web 服务器,你可以通过浏览器访问http://10.9.4.165:4040来查看 Spark 的任务运行状态等信息。

pyspark

运行 bin/pyspark 的输出为:

$ bin/pyspark
Python 2.7.6 (default, Sep  9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
15/03/30 15:19:07 WARN Utils: Your hostname, june-mac resolves to a loopback address: 127.0.0.1; using 10.9.4.165 instead (on interface utun0)
15/03/30 15:19:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/03/30 15:19:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ / __/  _/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc, HiveContext available as sqlCtx.

你也可以使用 IPython 来运行 Spark:

IPYTHON=1  ./bin/pyspark

如果要使用 IPython NoteBook,则运行:

IPYTHON_OPTS="notebook"  ./bin/pyspark

从日志可以看到,不管是 bin/pyspark 还是 bin/spark-shell,他们都有两个内置的变量:sc 和 sqlCtx。

SparkContext available as sc, HiveContext available as sqlCtx

sc 代表着 Spark 的上下文,通过该变量可以执行 Spark 的一些操作,而 sqlCtx 代表着 HiveContext 的上下文。

spark-submit

在Spark1.0之后提供了一个统一的脚本spark-submit来提交任务。

对于 python 程序,我们可以直接使用 spark-submit:

$ mkdir -p /usr/lib/spark/examples/python
$ tar zxvf /usr/lib/spark/lib/python.tar.gz -C /usr/lib/spark/examples/python

$ ./bin/spark-submit examples/python/pi.py 10

对于 Java 程序,我们需要先编译代码然后打包运行:

$ spark-submit --class "SimpleApp" --master local[4] simple-project-1.0.jar

测试 RDD

在 Spark 中,我们操作的集合被称为 RDD,他们被并行拷贝到集群各个节点上。我们可以通过 sc 来创建 RDD 。

创建 RDD 有两种方式:

  • sc.parallelize()
  • sc.textFile()

使用 Scala 对 RDD 的一些操作:

val rdd1=sc.parallelize(List(1,2,3,3))
val rdd2=sc.parallelize(List(3,4,5))

//转换操作
rdd1.map(2*).collect //等同于:rdd1.map(t=>2*t).collect
//Array[Int] = Array(2, 4, 6, 6)

rdd1.filter(_>2).collect
//Array[Int] = Array(3, 3)

rdd1.flatMap(_ to 4).collect
//Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 3, 4, 3, 4)

rdd1.sample(false, 0.3, 4).collect
//Array[Int] = Array(3, 3)

rdd1.sample(true, 0.3, 4).collect
//Array[Int] = Array(3)

rdd1.union(rdd2).collect
//Array[Int] = Array(1, 2, 3, 3, 3, 4, 5)

rdd1.distinct().collect
//Array[Int] = Array(1, 2, 3)

rdd1.map(i=>(i,i)).groupByKey.collect
//Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1)), (2,CompactBuffer(2)), (3,CompactBuffer(3, 3)))

rdd1.map(i=>(i,i)).reduceByKey(_ + _).collect
//Array[(Int, Int)] = Array((1,1), (2,2), (3,6))

rdd1.map(i=>(i,i)).sortByKey(false).collect
//Array[(Int, Int)] = Array((3,3), (3,3), (2,2), (1,1))

rdd1.map(i=>(i,i)).join(rdd2.map(i=>(i,i))).collect
//Array[(Int, (Int, Int))] = Array((3,(3,3)), (3,(3,3)))

rdd1.map(i=>(i,i)).cogroup(rdd2.map(i=>(i,i))).collect
//Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(4))), (1,(CompactBuffer(1),CompactBuffer())), (5,(CompactBuffer(),CompactBuffer(5))), (2,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(3, 3),CompactBuffer(3))))

rdd1.cartesian(rdd2).collect()
//Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5), (3,3), (3,4), (3,5))

rdd1.pipe("head -n 1").collect
//Array[String] = Array(1, 2, 3, 3)

//动作操作
rdd1.reduce(_ + _)
//Int = 9

rdd1.collect
//Array[Int] = Array(1, 2, 3, 3)

rdd1.first()
//Int = 1

rdd1.take(2)
//Array[Int] = Array(1, 2)

rdd1.top(2)
//Array[Int] = Array(3, 3)

rdd1.takeOrdered(2)
//Array[Int] = Array(1, 2)

rdd1.map(i=>(i,i)).countByKey()
//scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

rdd1.countByValue()
//scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

rdd1.intersection(rdd2).collect()
//Array[Int] = Array(3)

rdd1.subtract(rdd2).collect()
//Array[Int] = Array(1, 2)

rdd1.foreach(println)
//3
//2
//3
//1

rdd1.foreachPartition(x => println(x.reduce(_ + _)))

更多例子,参考http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html


原创文章,转载请注明: 转载自JavaChen Blog,作者:JavaChen
本文链接地址:http://blog.javachen.com/2015/03/30/spark-test-in-local-mode.html
本文基于署名2.5中国大陆许可协议发布,欢迎转载、演绎或用于商业目的,但是必须保留本文署名和文章链接。 如您有任何疑问或者授权方面的协商,请邮件联系我。