\subtitle{Cluster computing}
\author{J. Fernando Sánchez, Joaquín Salvachúa, Gabriel Huecas }
\institute{Universidad Politécnica de Madrid}
\frametitle{LISP and functional programming}
\item Higher level programming
\item Avoid side effects
\item Pattern matching
\item A \textit{better} Java
\item Functional programming (optional)
\item Actors for (coarse) concurrency
\item Easy and repeatable deployment
\item Lots of pre-built images @
\item Building block for other tools (swarm, compose, machine...)
\frametitle{Word count in Wikipedia}
Problem: find the frequency each word is used in Wikipedia.
We have the text of all wikipedia in a text file\footnote{By happy coincidence, the first two lines are ``hi world'' and ``hi''}. It begins like this:
breaklines]
hi world
Scala (SKAH-lah)[9] is a general-purpose programming language. Scala has full support for functional programming and a strong static type system. Designed to be concise,[10] many of Scala's design decisions were inspired by criticism of Java's shortcomings.[8]
\item Read every line
\item Chunk every line into words
\item Count every occurrence
\item For every word, sum its occurrences
\frametitle{Possible results of every step}
(("hi world"), ("hi") ...)
List((hi, 1), (hi, 1), (world, 1) ...)
Map(hi ->(("hi", 1), (hi, 1)),
world -> ((world, 1)) ...)
Map(hi-> 2, world -> 1 ...)
\frametitle{Running the scala shell}
We will use docker.
docker run -it -v $PWD:Wikipedia.txt:Wiki \
--rm williamyeh/scala
\frametitle{Scala code}
val wiki = Source.fromFile("Wiki").getLines
wiki.flatMap(line=> line.split(" "))
map(x=>(x, 1)).toList
groupBy(x => x._1)
map({case (k, v) =>
(k, v.foldLeft(0)((a, b) => a+b._2)))
{\huge Let's run it in the shell.}
\frametitle{Wikipedia is big}
{\huge What happened?}
\frametitle{Limited resources}
\item CPU limits our speed
\item Multi-cores help...
\item ...but real parallelism is \textbf{hard}
\item RAM limits how much data you can process at the same time
\item What if you need more than 128GB?
\item You could use more than one computer...
\item ... but cluster computing is even harder than ``local'' parallelism
\item Functional programming helps a bit
\huge But... this was supposed to be fun, wasn't it?
\section{Introduction to Spark}
\subsection{What is Spark?}
\frametitle{Quick definition}
{\center {\huge Apache Spark™ is a fast and general engine for large-scale data processing.\footnote{\url{}}}}
On top of that:
\item Open source (Top-level Apache project)
\item Plays well with other tools
\subsection{vs MapReduce}
\frametitle{Comparison to MapReduce}
\item In-memory data
\item Less i/o overhead
\item Faster operations
\item Caching
\item Better for recursive tasks (e.g. machine learning)
\item Some libraries are dropping MapReduce support
\frametitle{Contributors to Spark/Hadoop 2014}
\frametitle{Project status}
\subsection{Key concepts}
Data (RDDs/Datasets)
\item RDD: Resilient Distributed Dataset
\item Collections of objects spread across a cluster, stored in RAM or on Disk
\item Built through parallel transformations
\item Automatically rebuilt on failure
\item Transformations (e.g. group, map, groupBy)
\item Actions (e.g. count, collect, save)
\frametitle{Transformations and actions}
\frametitle{RDDs vs Datasets}
Datasets are the future
\item More memory efficient
\item Libraries dropping support for RDDs
\frametitle{RDDs vs Datasets}
\frametitle{Language support}
val lines = sc.textFile(...)
lines.filter(x => x.contains("ERROR")).count()
lines = sc.textFile(...)
lines.filter(lambda s: "ERROR" in s).count()
sc is the Spark Context (more or this later)
sc is the Spark Context (more or this later)
\frametitle{Different flavors}
Language & App & REPL & Performance \\ \hline
Scala & Yes & Yes & \huge{\color{green}\smiley} \\
Java & Yes & No & \huge{\color{green}\smiley} \\
Python & Yes & Yes & \huge{\color{yellow}\smiley}\\ \hline
The Read-eval-print-loop (REPL) is the easiest way to get started and explore datasets. It is just a special Spark application that accepts user input (scala code).
\section{Working with RDDs}
From normal data structures
val nums = sc.parallelize(List(1, 2, 3))
val cont = sc.parallelize(List(("a", 1),
("a", 1),
("b", 3)))
From distributed/local sources
Note: sc is the spark context in the Spark interpreter
\frametitle{Operations: collect}
{\huge\ttfamily collect()}
Runs any pending transformation and returns the real values
> List(1, 2, 3)
> List((a, 1), (a, 1), (b, 3))
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
\frametitle{Operations: take}
{\huge\ttfamily take(N)}
Returns the N first elements
> List(1, 2)
> List((a, 1))
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
\frametitle{Operations: count}
{\huge\ttfamily count()}
Returns the number of elements in a collection
> 3
> 3
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
\frametitle{Operations: filter}
{\huge\ttfamily filter(fn)}
This time, we need to define a function.
Filter applies that function to every element, and returns those where the function returns true.
For example:
val fn = (x:Int)) => x > 1
Return a list containing the values where the function returns true
> List(2, 3)
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
\frametitle{Quick aside: anonymous functions and underscores}
% Defining a function when it is only going to be used once is tedious and makes reading code harder.
{\Large In scala, we can define ``anonymous functions'', also known as lambda functions.}
val fn = (x:Int)) => x > 1
{\Large is equivalent to:}
cont.filter((x:Int) => x>1)
{\Large Additionally, the scala compiler is smart enough to infer types in this example. Hence, we could simply write:}
cont.filter(x => x>1)
{\Large Furthermore, we could use underscores to replace arguments:}
\frametitle{Quick aside: anonymous functions and underscores}
{\large Every new argument in the lambda function represents a parameter
Hence, these two expresions are equivalent
_ + _
(x,y) => x+y
\frametitle{Operations: filter}
\Large Our last example could be written more concisely as:
> List(2, 3)
\frametitle{Operations: filter}
\Large What would this filter do?
cont.filter(_._1 == "a" && _._1 == 1)
> ???
\frametitle{Operations: filter}
<console>:9: error: missing parameter type
for expanded function ((x$1, x$2) =>
Note: The expected type requires a
one-argument function accepting a 2-Tuple.
\frametitle{Operations: filter}
Remember, each new underscore represents a new argument. So that expression expands to:
cont.filter((x, y) => x._1 == "a" &&
y._2 == 1)
\frametitle{Operations: filter}
The right expression is:
cont.filter(x => x._1 == "a" &&
x._2 == 1)
> List((a, 1), (a, 1))
\frametitle{Operations: map}
{\huge\ttfamily map(fn)}
Apply a function to every item in the list
> List(1, 1, 3)*3)
> List(3, 6, 9)
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
\frametitle{Operations: reduce}
{\huge\ttfamily reduce(fn)}
Merge elements with an associative function (concisely)
> 6
cont.reduce((x, y) => (x._1+y._1,
> (aab, 3)
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
{\huge\ttfamily groupByKey()}
Group elements of a list by the first item in the tuple
> [(b, [3]), (a, [1,1])]
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
{\huge \ttfamily reduceByKey(fn)}
Group by key and reduce each value
> [(b,3), (a,2)]
reduceByKey is more efficient than applying group, map and reduce separately. The reduce function can be given to each worker, which avoids passing unnecessary data.
\footnote{\href{}{Databricks' post on avoiding groupByKey }}
nums: List(1, 2, 3)
cont: List(("a", 1), ("a", 1), ("b", 3))
And once you are done, save your results to a file.
\frametitle{Example: Search in logs}
val lines = sc.textFile("hdfs://...")
val errors = lines.filter(s =>
val messages = => s.split("\t")._2)
messages.filter(s => s.contains("mysq")).count()
messages.filter(s => s.contains("php").count()
\section{Using Spark}
\frametitle{Local deployment using docker-compose}
{ \Large
Get the repo
Move to the repo
Run all the containers
Launch spark-shell inside the master container
git clone
cd docker-spark
docker-compose up
docker exec -it dockerspark_master_1 bin/spark-shell
Compose.yml Master
image: gettyimages/spark
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
MASTER: spark://master:7077
... bunch of ports ...
- ./conf/master:/conf
- ./data:/tmp/data
Compose.yml Worker
image: gettyimages/spark
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker
- master
- ./conf/worker:/conf
- ./data:/tmp/data
\frametitle{Useful info}
\item ./data folder is mounted as /tmp/data
\item Copy your datasets there
\item Load them in the shell: \texttt{sc.textFile("/tmp/data/...")}
\item Master Web UI (localhost:8080)
\item Worker Web UI (localhost:8081)
\item REPL Web UI (localhost:4040 when launched)
\textbf{A word of caution}: as any other app, the shell reserves resources on startup, whether you are using them or not.
\item Write the code
\item Compile the jar
\item Make your data available to every node in the cluster
\item Submit it to your cluster
\frametitle{Writing applications}
Example application
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkWordCount {
def main(args: Array[String]) {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("Spark Example"))
... Your program ...
\frametitle{Running an aplication}
./bin/spark-submit --class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
\section{Full examples}
\frametitle{Word frequency in wikipedia, revisited}
val wiki = sc.textFile("Wikipedia.txt")
val counts = wiki.flatMap(line=> line.split(" ")
map(word => (word, 1)))
reduceByKey(_ + _)
Pure scala
val wiki ="Wiki").getLines
wiki.flatMap(line=> line.split(" "))
map(x=>(x, 1)).toList
groupBy(x => x._1)
map({case (k, v) =>
(k, v.foldLeft(0)((a, b) => a+b._2)))
\huge{Shall we try it in the shell?}
\usebackgroundtemplate{\vbox to \paperheight{\vfil\hbox to \paperwidth{\hfil\includegraphics[height=\paperheight]{images/justwaithere.jpg}\hfil}\vfil}}%
\frametitle{Spark is not magic}
\item We still have to add more resources
\item Caching may cause the spark version to use \textbf{more memory} (this can be configured)
\huge{However, it allows us to scale our application}
\frametitle{Page rank}
\item Created by Google
\item Rank given by links and their importance
\item Iterative (Perfect for Spark!)
\text{PageRank of site} = \sum \frac{\text{Page rank of inbound link}}{\text{Number of links on that page}}
PR(u) = (1-d)+d\times \sum \frac{PR(v)}{N(V)}
\usebackgroundtemplate{\vbox to \paperheight{\vfil\hbox to \paperwidth{\hfil\includegraphics[height=\paperheight]{images/pageranktoy.jpg}\hfil}\vfil}}%
\frametitle{Page rank (code)}
\section*{Next week}
\begin{frame}{Next week}
\item Advanced Spark configuration
\item Multiple hosts
\item Spark ecosystem
\item More examples in IBM BlueMix
\section{Acknowledgements and useful links}
\item \href{}{Spark programming guide}
\item \href{}{Databricks introducing apache spark datasets}
\item \href{}{Data Analytics with Hadoop: In-Memory Computing with Spark}