Spark for Machine Learning and AI

Introduction to The Post

This post is going to describe how to use the Apache Spark Platform for Machine Learning. It will start by reviewing the basics of the dataframe data structure. Then, it will cover the pre-processing to both numeric and text data so that is ready to use with Spark’s MLlib machine learning library. The post will also describe multiple algorithms for clustering, classification and regression. In the end, it will briefly describe a recommendation system.

Introduction to Spark

Spark is a distributed, data processing platform for big data. Distributed means Spark runs on a cluster of servers and the data processing means it performs computations such as ETL and modelling. In the case of Spark, some of the most interesting computations are related to machine learning and data analysis. Big data is a term broadly applied to data sets that are not easily analyzed on a single server or using older data management systems that were designed to run on a single server. Spark is becoming increasingly polyglot with support for multiple languages. Software engineers familiar with Scala and Java can use those languages while data scientists who prefer Python and R can work with those languages.

This post will use Python as programming language. Spark uses a modular architecture that allows for multiple components or packages. These include MLlib for machine learning, Spark SQL for relational querying, Spark Streaming for continuous processing of streaming data, and GraphX for graph analysis, such as social network analysis. Spark is a generalized computation platform designed to manage large data sets. It’s found use in a wide number of industries and applications, including real-time monitoring of financial data, text analysis related to competitive intelligence and compliance, analyzing how customers use eCommerce sites, and healthcare applications, such as analyzing genomes.

Steps in Machine Learning Process

There are three broad steps in the machine learning process.

The first is preprocessing, which includes collecting, reformatting, and transforming data, so that it’s readily used by machine learning algorithms.

The second step is model building, in which machine learning algorithms are applied to training data to build models. Models are pieces of code that capture the information implicit in training data.

The last step is validation, in which to measure how well models are performing. There are multiple ways to measure performance. The preprocessing phase includes extracting, transforming, and loading data. This is similar to the ETL process used in business intelligence and data warehousing.

Creating Spark Session and Basic Dataframe Processing

The following code is going to import all packages to use spark commands:

1
2
3
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp

Create Spark Session

1
2
3
4
5
6
7
# Create Session
spark = SparkSession.builder.master('yarn').appName("spark_basic").getOrCreate()
# Configure the Session
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')

Basic Dataframe Processing

1
2
3
# Set the file path
address = "hdfs://10.22.17.39:9000"
sales_path = f"{address}/data/sales/"

There are 3 ways to read csv files:

1
2
3
4
5
6
7
8
9
10
# 1st way to read csv file
df = spark.read.csv(f"{sales_path}RK_B_TRANSACTION_WTCTW_201701_000000.csv")

# 2nd way to read csv file, similar to 3rd way
df2 = spark.read.format("csv").option("header", "true").load(f"{sales_path}RK_B_TRANSACTION_WTCTW_201701_000000.csv")

# 3rd way to read csv file, similar to 2nd way
# Suggested way to read, many options to specify
df3 = spark.read.load(f"{sales_path}RK_B_TRANSACTION_WTCTW_201701_000000.csv",
format="csv", sep="|", inferSchema="true", header="true", timestampFormat="yyyy.MM.dd HH:mm:ss")

Printing Schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Basic
df

# A bit more details
df.schema
display(df3)

# A more structured details
df.printSchema()

# Show data types for each columns
df.dtypes

# Show a summary of some calculated values like MAX, MIN, MEAN, COUNT for each column
df.describe().show()

# Explain the physical plan for the dataframe
df.explain()

Getting Some Contents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Print column names
df.columns

# Showing the dataframe
df.show()

# Showing the first row in dataframe
df.first()

# Get first 5 rows
df.take(5)

# Counting the number of rows
df.count()

# Take only 10% of the data
sample_df = df.sample(False, 0.1)
sample_df.count()

Basic Queries:

1
2
3
4
5
# Filtering Contents
emp_mgrs_df = df.filter("salary >= 100000")

# Seleting Columns to Show
emp_mgrs_df.select("salary").show()

A bit more advanced query examples:

1
2
3
4
5
6
7
8
# Select unique values within a column and sort it in ascending order
df.select("PRODUCT_KEY").distinct().orderBy("PRODUCT_KEY").show()

# Filter the type and select some useful columns
df.filter(df3['TRANSACTION_TYPE_NAME'] == 'Item').select('PRODUCT_KEY', 'TRANSACTION_ID', 'ORDER_NUM', 'ITEM_QUANTITY_VAL', 'ITEM_AMT', 'ITEM_UNIT_PRICE_AMT', 'TRANSACTION_DT')

# Select userful columns and group them by the keys, then calculate the sum of quantity for each key and sort it in a decending order.
df.select("PRODUCT_KEY", "ITEM_QUANTITY_VAL").groupBy("PRODUCT_KEY").sum("ITEM_QUANTITY_VAL").sort("sum(ITEM_QUANTITY_VAL)", ascending = False).show()

Some Useful Functions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# This allow the programme to retrive the results from terminal
def get_path(path):
arguments = "hdfs dfs -ls "+ path +" | awk '{print $8}'"
proc = subprocess.Popen(arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

s_output, s_err = proc.communicate()
all_files_path = s_output.decode('utf-8').split()

return all_files_path

# Transform date values and make some new columns to display
def expand_date_n_sales(dataframe):
filtered_sales = filter_sales(dataframe)
expand_date_n_sales = filtered_sales.select("PRODUCT_KEY", "ITEM_QUANTITY_VAL", "TRANSACTION_DT",
date_format('TRANSACTION_DT', 'Y').alias('year'),
date_format('TRANSACTION_DT', 'M').alias('month'),
date_format('TRANSACTION_DT', 'D').alias('day'),
date_format('TRANSACTION_DT', 'W').alias('week_no'))

return expand_date_n_sales

# Loop through all files to get the data and merge together
def get_all_cleaned_sales(address, path):
all_files_path = get_path(path)

for i in range(len(all_files_path)):
if i == 0:
first_raw_df = load_data(address, all_files_path[i])
df = expand_date_n_sales(first_raw_df)
else:
raw_df = load_data(address, all_files_path[i])
tmp_df = expand_date_n_sales(raw_df)
df = df.union(tmp_df)

return df

Components of Spark MLlib

The MLlib package has three types of functions.

The first is machine learning algorithms. The set of algorithms currently includes algorithms for classifications, which is for categorizing something, such as a customer likely to leave for a competitor. Regression, which is used for predicting a numeric value like a home price. Clustering is used to group similar items together. Unlike classification, there are no predefined groups, so this is really useful when exploring data. Finally, there’s topic modeling, which is a way to identify themes in a text.

The second group is workflows. Workflow components help organize commonly used steps, like pre-processing operations and tuning. This makes it easy to run a sequence of steps repeatedly while varying some parameters of the process.

Utilities are lower level functions that give you access to distributed linear algebra and statistics functions.

Introduction to Preprocessing

There are two types of pre-processing, numeric and text pre-processing.

Numeric

Normalisation (MinMaxScaler)

Normalising maps data values from their original range to the range of zero to one. It’s used to avoid problems when some attributes have large ranges and others have small ranges. For example, salaries have a large range, but years of employment has a small range.

Standardisation (StandardScaler)

Standardising maps data values from their original range to a range of negative one to one and it also has a mean value of zero. This transformation creates a normal distribution with a standard deviation of one. This transforms our data into a bell curve shape formation. It’s used when attributes have different scales, and the machine learning algorithm you’re using assumes a normal distribution.

Partition (Bucketiser)

Partitioning maps data values from continuous values to buckets, like histograms. Deciles and percentiles are examples of buckets. It’s useful when you want to work with groups of values instead of a continuous range of values.

Text

Tokenisation (Tokeniser)

This transformation maps text from a single string to a set of tokens, or words. For example, the sentence, quote, “This is a Sentence,” can be mapped into a list of tokens, or words, such as the four word list shown below.

1
["This", "is", "a", "sentence"]

Term Frequency Inverse Document Frequency (TF-IDF) - (Hashing TF)

This method maps text from a single, typically long string, to a vector, indicating the frequency of each word in a text relative to a group of texts such as a corpus. This transformation is widely used in text classification. TF-IDF captures the intuition that infrequently used words are more useful for distinguishing categories of text than frequently used words.

Introduction to Clustering

Often when working with new data sets, it helps to explore the data and look for macro-level structures such as broad clusters of data. Clustering algorithms group data into clusters that allow us to see how large data sets can break down into distinct subgroups. K-means is widely used and works well for finding clusters in small and mid-sized data sets. For large data sets, the Bisecting K-means algorithms can be faster.

K-means Clustering

Don’t forget to create a spark session before using spark!

Import some essential spark packages:

1
2
3
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

Create a dataframe:

1
2
3
cluster_df = spark.read.csv("./ex/Ch03/03_02/clustering_dataset.csv", header = True, inferSchema = True)
# Select all 75 rows of data
cluster_df.show(75)

Transform data to a feature vector:

1
2
vectorAssembler = VectorAssembler(imputCols = ["col1", "col2", "col3"], outputCol = "features")
vcluster_df = vectorAssembler.transform(cluster_df)

Setup K-means algorithm:

1
2
3
4
5
6
7
8
9
10
11
12
13
# Set the cluster number
kmeans = KMeans().setK(3)

# Set where the k-means algorithm starts
kmeans = kmeans.setSeed(1)

# Fit the data
kmodel = kmeans.fit(vcluster_df)

# Find the centers of the clusters
centers = kmodel.clusterCenters()

centers

Output:

1
2
3
[array([35.88461538, 31.46153846, 34.42307692]),
array([5.12, 5.84, 4.84]),
array([80. , 79.20833333, 78.29166667])]

Hierarchical Clustering

Import some essential spark packages:

1
from pyspark.ml.clustering import BisectingKMeans

Setup Bisecting KMeans algorithm:

1
2
3
4
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)
bkmodel = bkmeans.fit(vcluster_df)
bkcenters = bkmodel.clusterCenters()

Output:

1
2
3
[array([5.12, 5.84, 4.84]),
array([35.88461538, 31.46153846, 34.42307692]),
array([80. , 79.20833333, 78.29166667])]

Introduction to Classification

Classification algorithms are useful when we have datasets that we want to be able to split into different categories. So, for example, we might have a number of pieces of data that fall into Category A or Category B, and sometimes it’s not so obvious where certain things should fall. Classification algorithms help us identify boundaries between different categories and make it easy for us to then decide how to assign a new entity to a particular category.

Preprocessing The Iris Dataset

Import some essential spark packages:

1
2
3
4
5
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

Create A Spark Session:

1
spark = SparkSession.builder.master('local').appName("spark_basic").getOrCreate()

Create Spark Dataframe:

1
2
iris_df = spark.read.csv("iris.data", inferSchema = True)
iris_df.show()

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3| _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
|5.4|3.9|1.7|0.4|Iris-setosa|
|4.6|3.4|1.4|0.3|Iris-setosa|
|5.0|3.4|1.5|0.2|Iris-setosa|
|4.4|2.9|1.4|0.2|Iris-setosa|
|4.9|3.1|1.5|0.1|Iris-setosa|
|5.4|3.7|1.5|0.2|Iris-setosa|
|4.8|3.4|1.6|0.2|Iris-setosa|
|4.8|3.0|1.4|0.1|Iris-setosa|
|4.3|3.0|1.1|0.1|Iris-setosa|
|5.8|4.0|1.2|0.2|Iris-setosa|
|5.7|4.4|1.5|0.4|Iris-setosa|
|5.4|3.9|1.3|0.4|Iris-setosa|
|5.1|3.5|1.4|0.3|Iris-setosa|
|5.7|3.8|1.7|0.3|Iris-setosa|
|5.1|3.8|1.5|0.3|Iris-setosa|
+---+---+---+---+-----------+
only showing top 20 rows

Rename all columns:

1
2
3
4
5
6
7
8
iris_df = iris_df.select(col("_c0").alias("sepal_length"),
col("_c1").alias("sepal_width"),
col("_c2").alias("petal_length"),
col("_c3").alias("petal_width"),
col("_c4").alias("species")
)

iris_df.show()

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width| species|
+------------+-----------+------------+-----------+-----------+
| 5.1| 3.5| 1.4| 0.2|Iris-setosa|
| 4.9| 3.0| 1.4| 0.2|Iris-setosa|
| 4.7| 3.2| 1.3| 0.2|Iris-setosa|
| 4.6| 3.1| 1.5| 0.2|Iris-setosa|
| 5.0| 3.6| 1.4| 0.2|Iris-setosa|
| 5.4| 3.9| 1.7| 0.4|Iris-setosa|
| 4.6| 3.4| 1.4| 0.3|Iris-setosa|
| 5.0| 3.4| 1.5| 0.2|Iris-setosa|
| 4.4| 2.9| 1.4| 0.2|Iris-setosa|
| 4.9| 3.1| 1.5| 0.1|Iris-setosa|
| 5.4| 3.7| 1.5| 0.2|Iris-setosa|
| 4.8| 3.4| 1.6| 0.2|Iris-setosa|
| 4.8| 3.0| 1.4| 0.1|Iris-setosa|
| 4.3| 3.0| 1.1| 0.1|Iris-setosa|
| 5.8| 4.0| 1.2| 0.2|Iris-setosa|
| 5.7| 4.4| 1.5| 0.4|Iris-setosa|
| 5.4| 3.9| 1.3| 0.4|Iris-setosa|
| 5.1| 3.5| 1.4| 0.3|Iris-setosa|
| 5.7| 3.8| 1.7| 0.3|Iris-setosa|
| 5.1| 3.8| 1.5| 0.3|Iris-setosa|
+------------+-----------+------------+-----------+-----------+

Transform the dataframe into vector structure:

1
2
3
4
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)

viris_df.show()

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width| species| features|
+------------+-----------+------------+-----------+-----------+-----------------+
| 5.1| 3.5| 1.4| 0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
| 4.9| 3.0| 1.4| 0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
| 4.7| 3.2| 1.3| 0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
| 4.6| 3.1| 1.5| 0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
| 5.0| 3.6| 1.4| 0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
| 5.4| 3.9| 1.7| 0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|
| 4.6| 3.4| 1.4| 0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|
| 5.0| 3.4| 1.5| 0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|
| 4.4| 2.9| 1.4| 0.2|Iris-setosa|[4.4,2.9,1.4,0.2]|
| 4.9| 3.1| 1.5| 0.1|Iris-setosa|[4.9,3.1,1.5,0.1]|
| 5.4| 3.7| 1.5| 0.2|Iris-setosa|[5.4,3.7,1.5,0.2]|
| 4.8| 3.4| 1.6| 0.2|Iris-setosa|[4.8,3.4,1.6,0.2]|
| 4.8| 3.0| 1.4| 0.1|Iris-setosa|[4.8,3.0,1.4,0.1]|
| 4.3| 3.0| 1.1| 0.1|Iris-setosa|[4.3,3.0,1.1,0.1]|
| 5.8| 4.0| 1.2| 0.2|Iris-setosa|[5.8,4.0,1.2,0.2]|
| 5.7| 4.4| 1.5| 0.4|Iris-setosa|[5.7,4.4,1.5,0.4]|
| 5.4| 3.9| 1.3| 0.4|Iris-setosa|[5.4,3.9,1.3,0.4]|
| 5.1| 3.5| 1.4| 0.3|Iris-setosa|[5.1,3.5,1.4,0.3]|
| 5.7| 3.8| 1.7| 0.3|Iris-setosa|[5.7,3.8,1.7,0.3]|
| 5.1| 3.8| 1.5| 0.3|Iris-setosa|[5.1,3.8,1.5,0.3]|
+------------+-----------+------------+-----------+-----------+-----------------+

Convert string value of species into numeric values:

1
2
3
4
indexer = StringIndexer(inputCol = "species", outputCol = "label")
iviris_df = indexer.fit(viris_df).transform(viris_df)

iviris_df.show()

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width| species| features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
| 5.1| 3.5| 1.4| 0.2|Iris-setosa|[5.1,3.5,1.4,0.2]| 0.0|
| 4.9| 3.0| 1.4| 0.2|Iris-setosa|[4.9,3.0,1.4,0.2]| 0.0|
| 4.7| 3.2| 1.3| 0.2|Iris-setosa|[4.7,3.2,1.3,0.2]| 0.0|
| 4.6| 3.1| 1.5| 0.2|Iris-setosa|[4.6,3.1,1.5,0.2]| 0.0|
| 5.0| 3.6| 1.4| 0.2|Iris-setosa|[5.0,3.6,1.4,0.2]| 0.0|
| 5.4| 3.9| 1.7| 0.4|Iris-setosa|[5.4,3.9,1.7,0.4]| 0.0|
| 4.6| 3.4| 1.4| 0.3|Iris-setosa|[4.6,3.4,1.4,0.3]| 0.0|
| 5.0| 3.4| 1.5| 0.2|Iris-setosa|[5.0,3.4,1.5,0.2]| 0.0|
| 4.4| 2.9| 1.4| 0.2|Iris-setosa|[4.4,2.9,1.4,0.2]| 0.0|
| 4.9| 3.1| 1.5| 0.1|Iris-setosa|[4.9,3.1,1.5,0.1]| 0.0|
| 5.4| 3.7| 1.5| 0.2|Iris-setosa|[5.4,3.7,1.5,0.2]| 0.0|
| 4.8| 3.4| 1.6| 0.2|Iris-setosa|[4.8,3.4,1.6,0.2]| 0.0|
| 4.8| 3.0| 1.4| 0.1|Iris-setosa|[4.8,3.0,1.4,0.1]| 0.0|
| 4.3| 3.0| 1.1| 0.1|Iris-setosa|[4.3,3.0,1.1,0.1]| 0.0|
| 5.8| 4.0| 1.2| 0.2|Iris-setosa|[5.8,4.0,1.2,0.2]| 0.0|
| 5.7| 4.4| 1.5| 0.4|Iris-setosa|[5.7,4.4,1.5,0.4]| 0.0|
| 5.4| 3.9| 1.3| 0.4|Iris-setosa|[5.4,3.9,1.3,0.4]| 0.0|
| 5.1| 3.5| 1.4| 0.3|Iris-setosa|[5.1,3.5,1.4,0.3]| 0.0|
| 5.7| 3.8| 1.7| 0.3|Iris-setosa|[5.7,3.8,1.7,0.3]| 0.0|
| 5.1| 3.8| 1.5| 0.3|Iris-setosa|[5.1,3.8,1.5,0.3]| 0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+

Naive Bayes Classification

Import some essential spark packages:

1
2
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Split the dataset into train and test datasets:

1
2
3
splits = iviris_df.randomSplit([0.6, 0.4], 1)
train_df = splits[0]
test_df = splits[1]

Train the model using Naive Bayes Classifier and make the prediction:

1
2
3
4
5
nb = NaiveBayes(modelType = "multinomial")
nbmodel = nb.fit(train_df)

predictions_df = nbmodel.transform(test_df)
predictions_df.take(1)

Output:

1
[Row(sepal_length=4.5, sepal_width=2.3, petal_length=1.3, petal_width=0.3, species='Iris-setosa', features=DenseVector([4.5, 2.3, 1.3, 0.3]), label=0.0, rawPrediction=DenseVector([-10.3605, -11.0141, -11.7112]), probability=DenseVector([0.562, 0.2924, 0.1456]), prediction=0.0)]

Evaluate the accuracy:

1
2
3
4
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
nbaccuarcy = evaluator.evaluate(predictions_df)

nbaccuarcy

Output:

1
0.5862068965517241

Multilayer Perceptron Classification

Import some essential spark packages:

1
from pyspark.ml.classification import MultilayerPerceptronClassifier

Set the layers and do some training using Multilayer Perceptron Classifier as well as making predictions:

1
2
3
4
5
6
7
# Have 4 layers multilayer perceptron,
# the input is 4 neurons, two hidden layers are 5 neurons each and output layer has 3 neurons
layers = [4, 5, 5, 3]

mlp = MultilayerPerceptronClassifier(layers = layers, seed = 1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)

Evaluate the result:

1
2
3
4
mlp_evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)

mlp_accuracy

Output:

1
0.9482758620689655

Decision Trees Classification

Import some essential spark packages:

1
from pyspark.ml.classification import DecisionTreeClassifier

Train the model using Decision Trees Claccifier as well as making predictions:

1
2
3
4
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features")
dt_model = dt.fit(train_df)

dt_predictions = dt_model.transform(test_df)

Evaluate the result:

1
2
3
4
dt_evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)

dt_accuracy

Output:

1
0.9310344827586207

Introduction to Regresssion

Regression techniques allow us to make predictions about numeric values. For example, if we have a product and the price of that product has been steadily rising over time, we might want to be able to estimate what the price will be in the future. Now we could look at prices over a period of time and try and fit a line to those price points over time. That line is useful because it goes out into the future and we can use it to make projections about what the price might be at some future point.

Pre-processing The Dataset

Import some essential spark packages:

1
2
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Create a spark session:

1
spark = SparkSession.builder.master('local').appName("spark_basic").getOrCreate()

Read the CSV file:

1
2
pp_df = spark.read.csv("power_plant.csv")
pp_df

Output:

1
DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

Read the CSV file again correctly:

1
2
pp_df = spark.read.csv("power_plant.csv", header = True, inferSchema = True)
pp_df

Output:

1
DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

Creating a feature vector:

Import some essential spark packages:

1
from pyspark.ml.feature import VectorAssembler
1
2
3
4
vectorAssembler = VectorAssembler(inputCols = ["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)

vpp_df.take(1)

Output:

1
[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

Linear Regression

Import some essential spark packages:

1
from pyspark.ml.regression import LinearRegression

Train the model using Linear Regression:

1
2
lr = LinearRegression(featuresCol = "features", labelCol = "PE")
lr_model = lr.fit(vpp_df)

Coefficients:

1
lr_model.coefficients

Output:

1
DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

Intercept:

1
lr_model.intercept

Output:

1
454.6092744523414

Root Mean Squared Error:

1
lr_model.summary.rootMeanSquaredError

Output:

1
4.557126016749488

Save the model:

1
lr_model.save("lr1.model")

Decision Tree Regression

Import some essential spark packages:

1
2
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

Split the dataset into train and test dataset:

1
2
3
4
splits = vpp_df.randomSplit([0.7, 0.3])

train_df = splits[0]
test_df = splits[1]

Train the model and make the predictions using Decision Tree Regression:

1
2
3
4
dt = DecisionTreeRegressor(featuresCol = "features", labelCol = "PE")
dt_model = dt.fit(train_df)

dt_predictions = dt_model.transform(test_df)

Evaluate the result:

1
2
3
4
dt_evaluator = RegressionEvaluator(labelCol = "PE", predictionCol = "prediction", metricName = "rmse")
rmse = dt_evaluator.evaluate(dt_predictions)

rmse

Output:

1
4.459494278528065

Gradient-boosted Tree Regression

Import some essential spark packages:

1
from pyspark.ml.regression import GBTRegressor

Train the model and make the predictions using Gradient-boosted Tree Regression:

1
2
3
4
gbt = GBTRegressor(featuresCol = "features", labelCol = "PE")
gbt_model = gbt.fit(train_df)

gbt_predictions = gbt_model.transform(test_df)

Evaluate the result:

1
2
3
4
gbt_evaluator = RegressionEvaluator(labelCol = "PE", predictionCol = "prediction", metricName = "rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)

gbt_rmse

Output:

1
3.976988494544201

Understand Recommendation Systems

A common problem in machine learning is making recommendations. There’s two general ways of doing this. One is called Collaborative Filtering. Let’s imagine you run an online bookstore, and you have a number of customers. And these customers all like reading both the brown book and the red book. Now a new customer comes along and indicates that they really enjoyed reading the red book. What other books can we recommend to them? Definitely the brown book, since other people who have read the red book also enjoy reading the brown book. This is an example of collaborative filtering. Another way to make recommendations is based on the properties of the items that you’re working with. For example, if we have a customer who really enjoys readying Sci-fi, we might want to recommend other science fiction books to them, but not necessarily biographies. Spark MLlib supports Collaborative Filtering, and it works by filling in something known as the user-item matrix. So we can think of users as customers and items as books. In this example below, we have a customer who likes item one and two and item four. User number two, or customer number two, also likes item two and also likes item three. Now, we’ll notice that user four has something in common with both user one and user three. That means we probably want to recommend item two to user four. This is an example of collaborative filtering. This is the type of recommendation system that Spark MLlib supports.

Item1 Item2 Item3 Item4
User1 x x x
User1 x x
User1 x
User1 x ? x

Collaborative Filtering

Collaborative filtering follows the same patterns we’ve used repeatedly in this post.

First we start with preprocessing. We’re going to use the alternating least squares method that’s provided by Spark MLlib, and, to use that, we just import the ALS code from pyspark.ml.recommendation package. And then we build a DataFrame using user-item ratings.

When it comes to modeling, we create an ALS object and, when we do that, we have to specify the user, the item, and the rating columns in our data frames. And then we train the model using fit and fit is part of the ALS project. And then when it’s time to evaluate, we create predictions using the transform of the ALS model and we apply that to our test data. We create a RegressionEvaluator object and we use the evaluate function of that RegressionEvaluator object to calculate the root mean squared error, and that’ll give us a measure of how well our collaborative filtering is making recommendations.

Tips for Using Spark MLlib

Let’s review some tips for working with Spark MLlib.

There are three basic stages of building machine learning models. There’s a pre-processing phase where we collect, reformat, and transform the data. And once we have that data, we can build our models using a variety of machine learning algorithms. And then we want to make sure we evaluate our data to assess the quality of the models we built.

With that framework in mind, let’s look at some tips to make each of these stages go smoothly. First, when we’re pre-processing, we want to first load our data into DataFrames. If you’re working with text files, it helps to have headers or column names in the text file. When you read a file, make sure you use the inferSchema = True option. That’ll make sure that things like dates and numeric values get mapped to their appropriate data type. Use the VectorAssembler to create feature vectors and the StringIndexer to map from strings to numeric indexes.

During the model building phase, make sure to split your data into training and test sets. We use the training data to fit our models and then the test data to apply transformations to create predictions. When we’re done building the model, we want to validate them. Using the MLlib evaluators is recommanded. The two that we looked at were the MulticlassClassificationEvaluator and the RegressionEvaluator. Just be sure to use the right one for the kind of algorithm you’re working with. Also, be sure to experiment with multiple algorithms. Once you’ve gone through the pre-processing phase, it’s very easy to test other algorithms so take advantage of that. Also, vary hyperparameters for the algorithms you’re working with. Sometimes you can get slightly better performance just by changing a hyperparameter.

Where do we go from here? Well first I’d recommend consulting the MLlib documentation. It’s really high quality documentation and it provides details on the APIs and includes extensive examples. When you’re ready to work with other data sets, look at the Kaggle website that has both machine learning data sets and articles about machine learning. Now Spark is designed for working with big data so if you’re ready to work with machine learning at big data scales, consult the AWS data sets. These are public data sets that are freely available from the AWS cloud service. Spark and MLlib are both under active development. So as you go forward working with MLlib, be sure to check back at the Spark MLlib website for updates and new features.

Author: Zilan Huang
Link: http://hoanjinan.github.io/2019/08/20/Spark-for-Machine-Learning-AI/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
Donate
  • 微信
  • 支付寶

Comment