加入收藏 | 设为首页 | 会员中心 | 我要投稿 揭阳站长网 (https://www.0663zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 云计算 > 正文

Apache Spark源码走读 如何实行代码跟读

发布时间:2022-05-13 12:33:29 所属栏目:云计算 来源:互联网
导读:今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢? new Throwable().pri
         今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢?
 
        new Throwable().printStackTrace
        代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁。但有时苦于对spark系统的了解程度不深,或者对scala认识不够,一时半会之内无法找到答案,那么有没有什么简便的办法呢?
 
        我的办法就是在日志出现的地方加入下面一句话
 
复制
new Throwable().printStackTrace()
 
现在举一个实际的例子来说明问题。
 
比如我们在启动spark-shell之后,输入一句非常简单的sc.textFile("README.md"),会输出下述的log
 
复制
14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)  
14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms  
14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms  
res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
 
那我很想知道是第二句日志所在的tryToPut函数是被谁调用的该怎么办?
 
办法就是打开MemoryStore.scala,找到下述语句
 
复制
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(  
          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))  
 
在这句话之上,添加如下语句
 
复制
new Throwable().printStackTrace()
 
然后,重新进行源码编译
 
复制
sbt/sbt assembly
 
再次打开spark-shell,执行sc.textFile("README.md"),就可以得到如下输出,从中可以清楚知道tryToPut的调用者是谁
 
复制
14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code  
java.lang.Throwable  
  at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182)  
  at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)  
  at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)  
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699)  
  at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570)  
  at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821)  
  at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52)  
  at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)  
  at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)  
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)  
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787)  
  at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556)  
  at org.apache.spark.SparkContext.textFile(SparkContext.scala:468)  
  at $line5.$read$$iwC$$iwC$$iwC$$iwC.(:13)  
  at $line5.$read$$iwC$$iwC$$iwC.(:18)  
  at $line5.$read$$iwC$$iwC.(:20)  
  at $line5.$read$$iwC.(:22)  
  at $line5.$read.(:24)  
  at $line5.$read$.(:28)  
  at $line5.$read$.()  
  at $line5.$eval$.(:7)  
  at $line5.$eval$.()  
  at $line5.$eval.$print()  
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  
  at java.lang.reflect.Method.invoke(Method.java:483)  
  at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)  
  at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)  
  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)  
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)  
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)  
  at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)  
  at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)  
  at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)  
  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)  
  at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)  
  at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)  
  at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)  
  at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)  
  at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)  
  at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)  
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)  
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)  
  at org.apache.spark.repl.Main$.main(Main.scala:31)  
  at org.apache.spark.repl.Main.main(Main.scala)  
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  
  at java.lang.reflect.Method.invoke(Method.java:483)  
  at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)  
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)  
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  
14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)  
14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms  
14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms  
res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13
 
git同步
对代码作了修改之后,如果并不想提交代码,那该如何将***的内容同步到本地呢?
 
复制
git reset --hard  
git pull origin master
 
Akka消息跟踪
追踪消息的接收者是谁,相对来说比较容易,只要使用好grep就可以了,当然前提是要对actor model有一点点了解。
 
还是举个实例吧,我们知道CoarseGrainedSchedulerBackend会发送LaunchTask消息出来,那么谁是接收方呢?只需要执行以下脚本即可。
 
复制
grep LaunchTask -r core/src/main
 
 从如下的输出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到该函数之后的业务处理,只需要去看看接收方的receive函数即可。
 
复制
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:    case LaunchTask(data) =>  
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:        logError("Received LaunchTask command but executor was null")  
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage  
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
 
小结
今天的内容相对简单,没有技术含量,自己做个记述,免得时间久了,不记得。

(编辑:揭阳站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!