Wednesday, 3 October 2018

Installation Manual for Spark-2.1.1 on Ubuntu-14.04 64 bit


Single-Node Cluster
Configure Proxy Setting
$ sudo gedit /etc/apt/apt.conf
Add the following lines
Acquire::http::Proxy "http://user-id:password@proxy-address:port ";
Acquire::https::Proxy "https://user-id:password@proxy-address:port";
Acquire::ftp::Proxy "ftp://user-id:password@proxy-address:port ";
Update the Ubuntu
$ sudo apt-get update
Verifying Java Installation
$java -version
[If Java is already, installed on system, it gives the following response –
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
In case Java is not installed, then Install Java before proceeding to next step.]
$ sudo apt-get install default-jdk
alternative
$ sudo apt-get install openjdk-7-jdk
Install open-ssh server or ssh
$ sudo apt-get install openssh-server
Or
$ sudo apt-get install ssh
$ sudo apt-get install rsync
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
[Add i.e. copy generated private key file to workers machine in case of multi-node cluster]
Or Install Open SSH Server-Client
$sudo apt-get install openssh-server openssh-client
$ssh-keygen -t rsa -P ""
Copy the content of .ssh/id_rsa.pub (of master) to .ssh/authorized_keys (of all the slaves as well as master)

Create a Group
Create a group, configure the group sudo permissions and then add the user to the group. Here 'hadoop' is a group name and 'hduser' is a user of the group.
$ sudo addgroup hadoop
$ sudo adduser --ingroup hadoop hduser

Configure the sudo permissions for 'hduser'
$ sudo visudo
[Since by default ubuntu text editor is nano we will need to use CTRL + O to edit]
 [Add the permissions to sudoers i.e. add this line]
hduser ALL=(ALL) ALL
[Use CTRL + X keyboard shortcut to exit out. Enter Y to save the file.]

Create /app/spark/tmp directory
$ sudo mkdir -p /app/spark/tmp

Change the ownership and permissions of the directory /app/spark/tmp
$ sudo chown -R hduser /app/spark/tmp
$ sudo chmod -R 755 /app/spark/tmp

Switch User
[It is used by a computer user to execute commands with the privileges of another user account]
$ su hduser

Verifying Scala Installation
$ scala -version
[If Scala is already, installed on system, it gives the following response –
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
In case Scala is not installed, then proceed to next step for installation.]
Download Scala
Download the latest version of Scala by visit the following link http://www.scala-lang.org/download

Download Apache Spark
Download the latest version of Spark by visiting the following link https://spark.apache.org/downloads.html
[After downloading it, you will find the Spark tar file in the download folder]

Change the directory to /home/hduser/Desktop [or /downloads directory where the files spark-2.1.1-bin-hadoop2.6.tgz and scala-2.11.8.tgz are downloaded]
$ cd /home/hduser/Desktop/

Untar the spark-2.1.1-bin-hadoop2.6.tgz file
$ sudo tar -zxvf spark-2.1.1-bin-hadoop2.6.tgz

Move the contents of spark-2.1.1-bin-hadoop2.6 folder to /usr/local/spark
$ sudo mv spark-2.1.1-bin-hadoop2.6 /usr/local/spark

Untar the scala-2.11.8.tgz file
$ sudo tar -zxvf scala-2.11.8.tgz

Move the contents of scala-2.11.8 folder to /usr/local/scala
$ sudo mv scala-2.11.8 /usr/local/scala

Edit $HOME/.bashrc file by adding the spark and scala path
$ sudo gedit $HOME/.bashrc
Or alternate
$ sudo gedit ~/.bashrc
[Add the following line to ~/.bashrc file. It means adding the location, where the spark, scala software files are located to the PATH variable]
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export SCALA_HOME=/usr/local/scala
export SPARK_HOME=/usr/local/spark
export PATH==$PATH:$SPARK_HOME/bin:$JAVA_HOME/bin:$SCALA_HOME/bin

Reload the changed $HOME/.bashrc settings
$ source $HOME/.bashrc

Verify Scala Installation
$ scala –version

Verify the Spark Installation
[Write the following command for opening Spark shell]
$ spark-shell

Change the directory to /usr/local/spark/conf
$ cd /usr/local/spark/conf

Copy the spark-env.sh.template to spark-env.sh
$ sudo cp spark-env.sh.template spark-env.sh

Edit spark-env.sh file
$ sudo gedit spark-env.sh
[Add the following lines to spark-env.sh file, and Save and Close]
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin
export SCALA_HOME=/usr/local/scala
# export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=2
# export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_DIR=/app/spark/tmp
export SPARK_MASTER_HOST=hadoopmaster
export SPARK_MASTER_PORT=7077
# Options read in YARN client mode
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_EXECUTOR_INSTANCES=2
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=1G
export SPARK_DRIVER_MEMORY=1G
export SPARK_YARN_APP_NAME=Spark

Copy the spark-defaults.conf.template to spark-defaults.conf
$ sudo cp spark-defaults.conf.template spark-defaults.conf

Edit spark-defaults.conf file
$ sudo gedit spark-defaults.conf
[Add the following line to spark-defaults.conf file, and Save and Close]
spark.master                spark://127.0.0.1:7077
# or # spark.master                  spark://<HOSTNAME OF YOUR MASTER NODE>:7077
spark.serializer             org.apache.spark.serializer.KryoSerializer
spark.dynamicAllocation.enabled        true
spark.driver.memory              5g

Copy the slaves.template to slaves
$ sudo cp slaves.template slaves
[Create if it does not exist]

Edit slaves file
$ sudo gedit slaves
[Add the following line to slaves file, and Save and Close. Update all workers IP address(one entry in one line)]
localhost
[For multi-node cluster, add all nodes as below]
<HOSTNAME OF YOUR MASTER NODE>
<HOSTNAME OF YOUR SLAVE NODE 1>
...
...
<HOSTNAME OF YOUR SLAVE NODE n>
[Repeat same above steps on other slave nodes]

Update spark-env.sh on workers node (Optional):- In order to start N number of worker instances we need to update spark-env.sh file as done above. It can be SPARK_WORKER_INSTANCES=3 on particular machine/master/slaves for 3 instances.

Change the ownership and permissions of the directory /usr/local/spark
$ sudo chown -R hduser /usr/local/spark
$ sudo chmod -R 755 /usr/local/spark

Change the ownership and permissions of the directory /usr/local/scala
$ sudo chown -R hduser /usr/local/scala
$ sudo chmod -R 755 /usr/local/scala

Start Master and all Worker Daemons
$ cd /usr/local/spark/sbin
$ ./start-all.sh
Or $ sh /usr/local/spark/sbin/start-all.sh
Or Start master using ./start-master.sh followed by ./start-slaves.sh

The JPS (Java Virtual Machine Process Status Tool) tool is limited to reporting information on JVMs for which it has the access permissions.
$ jps

Web-UI
[Once the spark is up and running check the web-ui of the components as described below]
http://127.0.0.1:8080/
http://localhost:8080
http://localhost:8088

Stop Master and all Worker Daemons
$ ./stop-all.sh
Or $ sh /usr/local/spark/sbin/stop-all.sh

Start Spark shell using YARN
Launch Spark Shell using following command
$ /usr/local/spark/bin/spark-shell --master yarn-client
[Depreciated command]
$ spark-shell --master yarn --deploy-mode client
(You can't run the shell in cluster deploy-mode)

In case you want to start Spark Shell without using YARN use following command
$ /usr/local/spark/bin/spark-shell --master local[2]
Here local[2] means start spark shell in standalone mode using two threads

Spark-shell can also be launched in standalone mode as follows:
$ spark-shell --master spark://192.168.213.133:7077
[check this one too] $ bin/spark-shell spark://hadoopmaster:7077

Multi-Node Cluster

@ Master Node
Find inet address
$ ifconfig
Let:
Namenode >    hadoopmaster > 192.168.23.130
Datanodes >     hadoopslave1 > 192.168.23.131
                        hadoopslave2 > 192.168.23.132
Add Entries in hosts file
$ sudo gedit /etc/hosts
[Now add entries of master and slaves]
192.168.23.130           hadoopmaster
192.168.23.131           hadoopslave1
192.168.23.132           hadoopslave2
Edit spark-defaults.conf file
$ sudo gedit spark-defaults.conf
[Add the following line to spark-defaults.conf file, and Save and Close]
spark.master                spark://hadoopmaster:7077
spark.serializer             org.apache.spark.serializer.KryoSerializer

Edit slaves file
$ sudo gedit /usr/local/spark/conf/slaves
[Add the following line to slaves file, and Save and Close. Update all workers IP address(one entry in one line)]
hadoopmaster
hadoopslave1
hadoopslave2
@ Slave Nodes
Add Entries in hosts file
$ sudo gedit /etc/hosts
[Now add entries of master and slaves]
192.168.23.130           hadoopmaster
192.168.23.131           hadoopslave1
192.168.23.132           hadoopslave2
Java and Scala must be installed on each slave nodes
Copy setups from master to all the slaves
@ Master Node
Create tarball of configured setup
$ tar czf spark.tar.gz spark-2.0.0-bin-hadoop2.6
Copy the configured tarball on all the slaves
$ scp spark.tar.gz hadoopslave1:~
$ scp spark.tar.gz hadoopslave2:~
Un-tar configured spark setup on all the slaves
@ Slave Nodes
$tar xzf spark.tar.gz
[Run this command on all the slaves]
@ Master Node
Start and Stop Spark Services
$sbin/start-all.sh
$sbin/stop-all.sh
Check daemons on Master
$jps
Master
Check daemons on Slaves
$jps
Worker
Spark Web UI
Spark Master UI
http://hadoopmaster:8080/
http://hadoopmaster:8088
Spark application UI
http://hadoopmaster:4040/

Enable History Server in Spark
$ hdfs dfs -mkdir /user/hduser/spark-logs
$ sudo gedit /usr/local/spark/conf/spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir                  hdfs://hadoopmaster:9000/user/hduser/spark-logs
spark.history.fs.logDirectory  hdfs://hadoopmaster:9000/user/hduser/spark-logs
[UI]     http://hadoopmaster:18080/

Scala – Spark Shell Commands-I

Create a simple text file sample.txt with some content.
$ sudo gedit sample.txt

Put above file on HDFS
$ hdfs dfs -copyFromLocal ./sample.txt /user/root/
$ bin/hdfs dfs -put /home/hduser/data input
$ bin/hdfs dfs -ls /user/hduser/input
$ bin/hdfs dfs -ls -R /user/hduser

Execute following steps of word count example
scala> val logFile = "hdfs://hadoopmaster:9000/user/hduser/input/sample.txt"
scala> val file = sc.textFile(logFile)
scala> val counts = file.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
scala> counts.collect()

Scala – Spark Shell Commands-II

Start the Spark Shell
$bin/spark-shell

Create a new RDD
scala> val data = sc.textFile("data.txt")
[Read File from local filesystem and create an RDD. sc is the object of SparkContext. You need to create a file data.txt in Spark_Home directory]
scala> val no = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val noData = sc.parallelize(no)
[Create an RDD through Parallelized Collection]
scala> val newRDD = no.map(data => (data * 2))
[From Existing RDDs]

Number of Items in the RDD
scala> noData.count()

Filter Operation
scala> val DFData = data.filter(line => line.contains("DataFlair"))

Filter transformation and count action together
scala> data.filter(line => line.contains("DataFlair")).count()

Read the first item from the RDD
scala> data.first()

Read the first 5 item from the RDD
scala> data.take(5)

RDD Partitions
scala> data.partitions.length
[Note: Minimum no. of partitions in the RDD is 2 (by default). When we create RDD from HDFS file then a number of blocks will be equals to the number of partitions.]

Cache the file
scala> data.cache()
[RDD will not be cached once you run above operation, you can visit the web UI: http://localhost:4040/storage, it will be blank. RDDs are not explicitly cached once we run cache(), rather RDDs will be cached once we run the Action, which actually needs data read from the disk. Let’s run some actions]
scala> data.count()
scala> data.collect()

Read Data from HDFS file
scala> var hFile = sc.textFile("hdfs://localhost:9000/inp")
[To read data from HDFS file we can specify complete hdfs URL like hdfs://IP:PORT/PATH]

Spark WordCount Program in Scala
scala> val wc = hFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

Read the result on console
scala> wc.take(5)
[display first 5 results]

Write the data to HDFS file
scala> wc.saveAsTextFile("hdfs://localhost:9000/out")

Examples of Map and FlatMap

Map Transformation Scala Example
Create RDD
val data = spark.read.textFile("INPUT-PATH").rdd
Map Transformation-1
val newData = data.map (line => line.toUpperCase() )
Map Transformation-2
val tag = data.map {line => {
 val xml = XML.loadString(line)
 xml.attribute("Tags").get.toString()
 }
}

Map Transformation Java Example
Create RDD
JavaRDD<String> linesRDD = spark.read().textFile("INPUT-PATH").javaRDD();
Map Transformation
JavaRDD<String> newData = linesRDD.map(new Function<String, String>() {
 public String call(String s) {
String result = s.trim().toUpperCase();
return result;
}
});

FlatMap Transformation Scala Example
val result = data.flatMap (line => line.split(" ") )

FlatMap Transformation Java Example
JavaRDD<String> result = data.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) {
return Arrays.asList(s.split(" ")).iterator();
 } });

Deploying Jobs to YARN

2 Types of Deployment Modes
1) Client Mode            2) Cluster Mode

$ ./bin/spark-submit --class com.WordCount2 --master yarn --deploy-mode client --executor-cores 1 --num-executors 1 /home/hduser/Desktop/1.6\ SPARK/WordCount.jar

$ ./bin/spark-submit --class com.WordCount2 --master yarn --deploy-mode cluster --executor-cores 1 --num-executors 1 /home/hduser/Desktop/1.6\ SPARK/WordCount.jar
In order to submit an application to the Standalone cluster manager, pass spark://masternode:7077 as the master argument to spark-submit. An application can be submitted to master as follows :-

$ spark-submit --class "WordCount" --master spark://192.168.213.133:7077 target/scala-2.10/wordcount-spark-application_2.10-1.0.jar

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://sapna-All-Series:7077 --executor-memory 1G --total-executor-cores 1 /home/sapna/spark-2.0.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.0.0.jar 10
–class: The entry point for your application.
–master: The master URL for the cluster.
–executor-memory: Specify memory to be allocated for the application.
–total-executor-cores: Specify no. of CPU cores to be allocated for the application.

$ spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

Some More Command Examples:

$ bin/spark-submit --class sparkWordCount.WordCount --master spark://hadoopmaster:7077 /home/hduser/sparkSpace/WordCount.jar /user/hduser/sparkSpace/sample.txt /user/hduser/sparkSpace/sampleOutput1

$ hdfs dfs -ls /user/hduser/sparkSpace/sampleOutput1

$ hdfs dfs -get /user/hduser/sparkSpace/sampleOutput1 /home/hduser/sparkSpace/

$ spark-submit --class wordCount.WordCount --master spark://hadoopmaster:7077 --conf spark.default.parallelism=12 --executor-cores 12 --total-executor-cores 12 /home/hduser/sparkSpace/SparkJavaProject.jar /user/hduser/sparkSpace/gutenberg/sample.txt /user/hduser/sparkSpace/sampleOutput

References
Many other online available public sources and blogs