Spark小结及性能调优

    最近转战Spark相关领域,编写了spark版本的LDA算法,在此过程中对程序不断调优同时也对spark系统相关参数进行调优,得到了一下一些经验。

    首先是对于RDD转化为PariRDD,初学者可能会有很多疑问,为什么通过map将RDD转化为tuple后应该有相应的PairRDD操作,但是IDE并么有提示,原因是缺少隐式转换,加入  import org.apache.spark.SparkContext._  即可解决。

    其次,有些情况下combineByKey的性能是十分好的,相当于在每个paritition处做combiner,减少网络传输和处理时间。

    下面是一些参数调整:

     1、spark.driver.maxResultSize 8g    driver获得处理结果的最大内存数,由于我要处理大矩阵,所以这个参数还是不得不改的

     2、spark.yarn.executor.memoryOverhead  2048    跑了一段时间后发现很多executor堆外内存占用过大,采用这个参数后稍好

     3、spark.shuffle.blockTransferService nio     spark 1.2.0以后shuffle service改为了netty,这个很扯淡,我改为nio后堆外内存较少了很多,同时处理时间提示提升了一倍

     先写到这儿,感谢同事何良均提供第三个参数,他的blog地址是http://blog.csdn.net/hit_hlj_sgy/article/list/2

     我的spark LDA github地址是https://github.com/jiangyu/SparkLDA

Spark 编译及部署

    Spark编译不是很复杂,使用maven就可以编译,但对maven的内存要求较高。

    1、export MAVEN_OPTS=”-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m”

    2、mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

          因为我是要跑在yarn上,其中要指定hadoop版本。

    除了第一次需要下载大量jar包,其它编译大概需要20分钟左右,编译好后在$SPARK_HOME/assembly/target/scala-2.10/ 下面有个jar包,大概100多兆,这是我们部署用的。


    Spark部署和配置也不难,从官网下载发布包(非刚才的源码包)即可,需要以下几步就可以部署使用。

    1、$SPARK_HOME/lib  下替换spark-assmbly的jar包,替换为刚才编译出来的jar包

    2、在$SPARK_HOME/conf  配置spark-env.sh

export SPARK_YARN_JAR==hdfs://ns1/spark/jar/spark-assembly-1.2.1-hadoop2.4.0.jar                #上传刚编译的jar包到dfs上,让所有worked都能get到

export HADOOP_CONF_DIR=/usr/local/hadoop-2.4.0/etc/hadoop                    #Hadoop配置路径

export YARN_CONF_DIR=$HADOOP_CONF_DIR              #Yarn配置路径

export SPARK_DAEMON_JAVA_OPTS=”-Djava.library.path=/usr/local/hadoop-2.4.0/lib/native/ -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data0/hadoop/gclog/heap.dump.spark”

export SPARK_SUBMIT_LIBRARY_PATH=/usr/local/hadoop-2.4.0/lib/native/         #LD_LIBRARY使用,调用native库用,如果使用lzo等配置好

     

     3、在$SPARK_HOME/conf下配置spark-default.sh

 

 

spark.yarn.historyServer.address *.*.*.*:18080     #yarn historyserver地址

 

spark.eventLog.enabled true

 

spark.eventLog.dir   hdfs://ns1/spark/logs

 

 

    至此,Spark配置完成,如果在YARN上跑一下测试可以用$SPARK_HOME/bin/spark-shell —master yarn-client


Spark SQL部署指南

      最近一直在看Spark以及Hive相关的东西,Spark SQL就是使用Hive SQL Parser将hive语句转化为RDD,方便Hive用户平滑迁移。

      首先就是如何打包:

       1、从github上下载源码,地址是https://github.com/apache/spark,我选取的是1.0.2(1.1.0在提交时候出现Permission denied问题,我放弃了)

      2、安装maven,scala等,Hadoop集群,并设定home目录

      3、通过Maven打包前修改Maven的jvm参数,export MAVEN_OPTS=”-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512”

      4、根据HADOOP版本通过MAVEN打包,比如对2.4版本 bash make-distribution.sh –name spark-1.0.2 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 打包出来是一个tgz的包,上传到部署的机器上 部署问题:

      5、解压tgz的包,将hive需要的mysql-jdbc的jar包拷贝到lib下,同时将hive配置文件hive-site.xml配置好拷贝到conf下

      6、修改配置文件,修改spark-default.conf

spark.yarn.historyServer.address 10.39.5.23:18080 // spark history server position
spark.eventLog.enabled true
spark.eventLog.dir   hdfs://ns1/user/jiangyu2/logs

修改spark-env.sh

export SPARK_JAR=hdfs://ns1/user/jiangyu2/spark-assembly-1.1.0-SNAPSHOT-hadoop2.4.0.jar
export HADOOP_CONF_DIR=/usr/local/hadoop-2.4.0/etc/hadoop
export YARN_CONF_DIR=/usr/local/hadoop-2.4.0/etc/hadoop
export SPARK_YARN_USER_ENV="CLASSPATH=/usr/local/hadoop-2.4.0/etc/hadoop/"
export SPARK_SUBMIT_LIBRARY_PATH=/usr/local/hadoop-2.4.0/lib/native/
export SPARK_CLASSPATH=$SPARK_CLASSPATH

7、启动spark shell

$ ./bin/spark-shell --master yarn-client
 
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@1f3b0168
 
scala> import hiveContext._
import hiveContext._
 
scala> hql("SELECT t1.fans_uid,t3.user_type FROM   (SELECT fans_uid,atten_uid,time FROM   ods_user_fanslist WHERE  dt = '20140909') t1  JOIN (SELECT uid,user_type_id AS user_type,user_status,reg_time FROM   mds_user_info WHERE  dt = '20140909') t3 ON t1.atten_uid = t3.uid limit 10”).collect().foreach(println)

8、写java程序提交同样的hql

package nimei;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;
 
public class NiDaYe {
  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("caocaocao");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 
    JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(ctx);
 
    // Queries are expressed in HiveQL.
    hiveCtx.hql("SELECT count(fans_uid),user_status from(SELECT fans_uid,atten_uid,time FROM   ods_user_fanslist WHERE  dt = '20140909') " +
    		"t1  JOIN (SELECT uid,user_type_id AS user_type,user_status,reg_time FROM  " +
    		" mds_user_info WHERE  dt = '20140909') t3 ON t1.atten_uid = t3.uid group by user_status").collect();
  }
}

  最后就是applicationMaster页面了 spark