Basic concepts behind Apache Spark

Apache Spark is a cluster computing system. It provides the API for Java, Scala (among others). It also provides a neat set of tools for graph and structured data processing, machine learning and data streaming (for instance from Kafka).

The basic data abstraction in Spark is Resilient Distributed Datasets (RDD) which is an immutable, partitioned collection of elements that can be operated on in parallel. There are several ways to create RDD

First is to parallelise an existing collection in the driver program

1
2
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

Spark will partition (slice) the data automatically based on the info about the cluster (docs claims that is it 2-4 partitions per CPU in the cluster). This value can be manually set by providing second argument to the sc.parallelize(data,10)

Second way is to reference a dataset in an external storage system

1
JavaRDD<String> distFile = sc.textFile("data.txt");

Important thing about the Spark. All the resources (like the files) are a local scope of the worker node. In other words file needs to be accessible at the same path on all worker nodes.

For Java we have an org.apache.spark.api.java.JavaRDD implementation. Some tools available in Spark like mllib (machine learning library) requires JavaRDD to be converted to org.apache.spark.rdd.RDD which can be easily done with

1
RDD<String> distFile = sc.textFile("data.txt").rdd();

There are two types of RDD’s operations
Transformations which creates a new dataset from the existing one. Transformations are lazy i.e. they do not compute their results right away (transformations applied to some base dataset are remembered and computed when an action requires a result to be returned to the driver program).

Actions which returns a value to the driver program after running a computation on the dataset

1
2
JavaRDD<Integer> lineLengths = lines.map(s -> s.length()); // code is not executed
int totalLength = lineLengths.reduce((a, b) -> a + b); // code is executed because data is returned to Driver (at this point execution is also break into tasks and executed on cluster)

Each transformed RDD can be recomputed each time you run an action on it (this is default) but it can also be persisted in memory or on disk with data.persist(…)

To transform RDD we need to provide a function to the Spark operation. Depending on the Java version you have you can either implement the Function interfaces in your own class (as an anonymous inner class or a named one) and pass an instance of it to Spark

1
2
3
4
5
6
7
8
9
10
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
       public Integer call(String s) { return s.length(); }
});

or

class GetLength implements Function<String, Integer> {
       public Integer call(String s) { return s.length(); }
}
JavaRDD<Integer> lineLengths = lines.map(new GetLength());

Or for Java 8 you can use lambda expressions

1
2
3
4
5
6
7
8
9
10
11
12
13
JavaPairRDD<String,Double> terms2docs = data
       // Build the base set of term->docId
       .flatMapToPair((TrainingDocument d) -> {
              ArrayList<Tuple2<String, Integer>> base = new ArrayList<>();
              d.getTerms().forEach(s -> {
                     base.add(new Tuple2<String, Integer>(s, d.getDocId()));
              });
              return base;
       })
       // Remove redundancy i.e. remove all identical tuples
       .distinct()
       // Group by term
       .groupBy(Tuple2::_1);

Last important remark is about using variables. Spark will ship a copies of final variables in the enclosing scope to each worker node. Beware of using global variables accessed by functions passed to Spark’s RDD operations. Such variables are not updated globally and reside in local JVM of the worker node thus the effects of using it in computations can be biased.

1
2
3
int counter = 0;
// Wrong: Don't do this!!  
rdd.foreach(x -> counter += x);

Leave a Reply