Adventures in CS

On Spark

24 Mar 2019

Updated on: 09-04-2019 These notes will be consistently updated as I learn new things while experimenting with spark

Partitions and executors

One partition is executed by one executor so your number of partitions should be equal to the number of executors with which you are running the job.

GraphX and Java

Spark supposedly has an Alpha API in Java to use for GraphX. However, If you want to change the code for the GraphX data structures and builtin algorithms you would have to do the changes in Scala. There is a GraphFrames project to work with Graphs on Spark with Java as well. However, it has some limitation. Please take a look at the latest update on it to see those limitations.

Lazy Transformations in Spark

All transformations in Spark are lazy i.e., they do not compute their results right away. Transformations are all the operations that transform one RDD into another. The transformations are only applied if there is an action operation in the Spark program. Actions are the operations that return a value to the driver program after running a computation on the RDD/dataset. All of the available actions in latest version of Spark can be found here.

Figuring out Location of a Partition

So, I believe the only way to know what executor a partition is executed at is to use org.apache.spark.SparkEnv to access the BlockManager that corresponds to a single executor. That’s exactly how Spark knows/tracks executors (by their BlockManagers).

You could write a org.apache.spark.scheduler.SparkListener that would intercept onExecutorAdded, onBlockManagerAdded and their *Removed counterparts to know how to map executors to BlockManagers (but believe SparkEnv is enough).

Building a Runnable Distribution

You can build spark runnable using:

./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn -Pkubernetes

If you want to build a standalone version without the support for sparkr simply use the following:

./dev/make-distribution.sh --name custom-spark --pip --tgz -Phadoop-2.7

The make-distribution.sh script always does a mvn clean package. This can take a while, so you can edit it to instead for a mvn package.

The script produces a spark-<spark-version>-SNAPSHOT-custom-spark.tgz package that you can untar to get a distribution that similar to what you would down from Spark Download page.

Take a look at this documentation for more details guide.

Cache and Persist

Cache and persist are fairly similar. cache() is equivalent to persist(MEMORY_ONLY). Persist also allows for other storage levels such as DISK_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, and OFF_HEAP etc. A full list can be found here.

Mistakes

Mistakes were made.

val edges = pids.mapPartitionsWithIndex{ (pid, it) => {
      val builder = new EdgePartitionBuilder[Int, Int]
      lines(pid).foreach { line =>
        if (!line.isEmpty && line(0) != '#') {
          val lineArray = line.split("\\s+")
          if (lineArray.length < 2) {
            throw new IllegalArgumentException("Invalid line: " + line)
          }
          val srcId = lineArray(0).toLong
          val dstId = lineArray(1).toLong
          if (canonicalOrientation && srcId > dstId) {
            builder.add(dstId, srcId, 1)
          } else {
            builder.add(srcId, dstId, 1)
          }
        }
      }
      Iterator((pid, builder.toEdgePartition))
    }

(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Number of partitions when reading files from HDFS

By default, Spark creates a single partition for each input split of the file from HDFS. Specifing the number of partitions that doesn’t match the number of input splits or using the repartitioning function would lead to execution of the partition algorithm once more. References:

  1. https://stackoverflow.com/questions/30725687/how-to-know-which-worker-a-partition-is-executed-at
  2. https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code
  3. https://stackoverflow.com/questions/29011574/how-does-spark-partitioning-work-on-files-in-hdfs

Tweet me @mb_ce if you like this post.

Tweet
comments powered by Disqus