Introduction to The Post
This post is going to describe how to use the Apache Spark Platform for Machine Learning. It will start by reviewing the basics of the dataframe data structure. Then, it will cover the pre-processing to both numeric and text data so that is ready to use with Spark’s MLlib machine learning library. The post will also describe multiple algorithms for clustering, classification and regression. In the end, it will briefly describe a recommendation system.
Introduction to Spark
Spark is a distributed, data processing platform for big data. Distributed means Spark runs on a cluster of servers and the data processing means it performs computations such as ETL and modelling. In the case of Spark, some of the most interesting computations are related to machine learning and data analysis. Big data is a term broadly applied to data sets that are not easily analyzed on a single server or using older data management systems that were designed to run on a single server. Spark is becoming increasingly polyglot with support for multiple languages. Software engineers familiar with Scala and Java can use those languages while data scientists who prefer Python and R can work with those languages.
This post will use Python as programming language. Spark uses a modular architecture that allows for multiple components or packages. These include MLlib for machine learning, Spark SQL for relational querying, Spark Streaming for continuous processing of streaming data, and GraphX for graph analysis, such as social network analysis. Spark is a generalized computation platform designed to manage large data sets. It’s found use in a wide number of industries and applications, including real-time monitoring of financial data, text analysis related to competitive intelligence and compliance, analyzing how customers use eCommerce sites, and healthcare applications, such as analyzing genomes.
Steps in Machine Learning Process
There are three broad steps in the machine learning process.
The first is preprocessing, which includes collecting, reformatting, and transforming data, so that it’s readily used by machine learning algorithms.
The second step is model building, in which machine learning algorithms are applied to training data to build models. Models are pieces of code that capture the information implicit in training data.
The last step is validation, in which to measure how well models are performing. There are multiple ways to measure performance. The preprocessing phase includes extracting, transforming, and loading data. This is similar to the ETL process used in business intelligence and data warehousing.
Creating Spark Session and Basic Dataframe Processing
The following code is going to import all packages to use spark commands:
1 | from pyspark import SparkContext, SparkConf |
Create Spark Session
1 | # Create Session |
Basic Dataframe Processing
1 | # Set the file path |
There are 3 ways to read csv files:
1 | # 1st way to read csv file |
Printing Schema:
1 | # Basic |
Getting Some Contents:
1 | # Print column names |
Basic Queries:
1 | # Filtering Contents |
A bit more advanced query examples:
1 | # Select unique values within a column and sort it in ascending order |
Some Useful Functions:
1 | # This allow the programme to retrive the results from terminal |
Components of Spark MLlib
The MLlib package has three types of functions.
The first is machine learning algorithms. The set of algorithms currently includes algorithms for classifications, which is for categorizing something, such as a customer likely to leave for a competitor. Regression, which is used for predicting a numeric value like a home price. Clustering is used to group similar items together. Unlike classification, there are no predefined groups, so this is really useful when exploring data. Finally, there’s topic modeling, which is a way to identify themes in a text.
The second group is workflows. Workflow components help organize commonly used steps, like pre-processing operations and tuning. This makes it easy to run a sequence of steps repeatedly while varying some parameters of the process.
Utilities are lower level functions that give you access to distributed linear algebra and statistics functions.
Introduction to Preprocessing
There are two types of pre-processing, numeric and text pre-processing.
Numeric
Normalisation (MinMaxScaler)
Normalising maps data values from their original range to the range of zero to one. It’s used to avoid problems when some attributes have large ranges and others have small ranges. For example, salaries have a large range, but years of employment has a small range.
Standardisation (StandardScaler)
Standardising maps data values from their original range to a range of negative one to one and it also has a mean value of zero. This transformation creates a normal distribution with a standard deviation of one. This transforms our data into a bell curve shape formation. It’s used when attributes have different scales, and the machine learning algorithm you’re using assumes a normal distribution.
Partition (Bucketiser)
Partitioning maps data values from continuous values to buckets, like histograms. Deciles and percentiles are examples of buckets. It’s useful when you want to work with groups of values instead of a continuous range of values.
Text
Tokenisation (Tokeniser)
This transformation maps text from a single string to a set of tokens, or words. For example, the sentence, quote, “This is a Sentence,” can be mapped into a list of tokens, or words, such as the four word list shown below.
1 | ["This", "is", "a", "sentence"] |
Term Frequency Inverse Document Frequency (TF-IDF) - (Hashing TF)
This method maps text from a single, typically long string, to a vector, indicating the frequency of each word in a text relative to a group of texts such as a corpus. This transformation is widely used in text classification. TF-IDF captures the intuition that infrequently used words are more useful for distinguishing categories of text than frequently used words.
Introduction to Clustering
Often when working with new data sets, it helps to explore the data and look for macro-level structures such as broad clusters of data. Clustering algorithms group data into clusters that allow us to see how large data sets can break down into distinct subgroups. K-means is widely used and works well for finding clusters in small and mid-sized data sets. For large data sets, the Bisecting K-means algorithms can be faster.
K-means Clustering
Don’t forget to create a spark session before using spark!
Import some essential spark packages:
1 | from pyspark.ml.linalg import Vectors |
Create a dataframe:
1 | cluster_df = spark.read.csv("./ex/Ch03/03_02/clustering_dataset.csv", header = True, inferSchema = True) |
Transform data to a feature vector:
1 | vectorAssembler = VectorAssembler(imputCols = ["col1", "col2", "col3"], outputCol = "features") |
Setup K-means algorithm:
1 | # Set the cluster number |
Output:
1 | [array([35.88461538, 31.46153846, 34.42307692]), |
Hierarchical Clustering
Import some essential spark packages:
1 | from pyspark.ml.clustering import BisectingKMeans |
Setup Bisecting KMeans algorithm:
1 | bkmeans = BisectingKMeans().setK(3) |
Output:
1 | [array([5.12, 5.84, 4.84]), |
Introduction to Classification
Classification algorithms are useful when we have datasets that we want to be able to split into different categories. So, for example, we might have a number of pieces of data that fall into Category A or Category B, and sometimes it’s not so obvious where certain things should fall. Classification algorithms help us identify boundaries between different categories and make it easy for us to then decide how to assign a new entity to a particular category.
Preprocessing The Iris Dataset
Import some essential spark packages:
1 | from pyspark import SparkContext, SparkConf |
Create A Spark Session:
1 | spark = SparkSession.builder.master('local').appName("spark_basic").getOrCreate() |
Create Spark Dataframe:
1 | iris_df = spark.read.csv("iris.data", inferSchema = True) |
Output:
1 | +---+---+---+---+-----------+ |
Rename all columns:
1 | iris_df = iris_df.select(col("_c0").alias("sepal_length"), |
Output:
1 | +------------+-----------+------------+-----------+-----------+ |
Transform the dataframe into vector structure:
1 | vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol = "features") |
Output:
1 | +------------+-----------+------------+-----------+-----------+-----------------+ |
Convert string value of species into numeric values:
1 | indexer = StringIndexer(inputCol = "species", outputCol = "label") |
Output:
1 | +------------+-----------+------------+-----------+-----------+-----------------+-----+ |
Naive Bayes Classification
Import some essential spark packages:
1 | from pyspark.ml.classification import NaiveBayes |
Split the dataset into train and test datasets:
1 | splits = iviris_df.randomSplit([0.6, 0.4], 1) |
Train the model using Naive Bayes Classifier and make the prediction:
1 | nb = NaiveBayes(modelType = "multinomial") |
Output:
1 | [Row(sepal_length=4.5, sepal_width=2.3, petal_length=1.3, petal_width=0.3, species='Iris-setosa', features=DenseVector([4.5, 2.3, 1.3, 0.3]), label=0.0, rawPrediction=DenseVector([-10.3605, -11.0141, -11.7112]), probability=DenseVector([0.562, 0.2924, 0.1456]), prediction=0.0)] |
Evaluate the accuracy:
1 | evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy") |
Output:
1 | 0.5862068965517241 |
Multilayer Perceptron Classification
Import some essential spark packages:
1 | from pyspark.ml.classification import MultilayerPerceptronClassifier |
Set the layers and do some training using Multilayer Perceptron Classifier as well as making predictions:
1 | # Have 4 layers multilayer perceptron, |
Evaluate the result:
1 | mlp_evaluator = MulticlassClassificationEvaluator(metricName = "accuracy") |
Output:
1 | 0.9482758620689655 |
Decision Trees Classification
Import some essential spark packages:
1 | from pyspark.ml.classification import DecisionTreeClassifier |
Train the model using Decision Trees Claccifier as well as making predictions:
1 | dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features") |
Evaluate the result:
1 | dt_evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy") |
Output:
1 | 0.9310344827586207 |
Introduction to Regresssion
Regression techniques allow us to make predictions about numeric values. For example, if we have a product and the price of that product has been steadily rising over time, we might want to be able to estimate what the price will be in the future. Now we could look at prices over a period of time and try and fit a line to those price points over time. That line is useful because it goes out into the future and we can use it to make projections about what the price might be at some future point.
Pre-processing The Dataset
Import some essential spark packages:
1 | from pyspark import SparkContext, SparkConf |
Create a spark session:
1 | spark = SparkSession.builder.master('local').appName("spark_basic").getOrCreate() |
Read the CSV file:
1 | pp_df = spark.read.csv("power_plant.csv") |
Output:
1 | DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string] |
Read the CSV file again correctly:
1 | pp_df = spark.read.csv("power_plant.csv", header = True, inferSchema = True) |
Output:
1 | DataFrame[AT: double, V: double, AP: double, RH: double, PE: double] |
Creating a feature vector:
Import some essential spark packages:
1 | from pyspark.ml.feature import VectorAssembler |
1 | vectorAssembler = VectorAssembler(inputCols = ["AT", "V", "AP", "RH"], outputCol = "features") |
Output:
1 | [Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))] |
Linear Regression
Import some essential spark packages:
1 | from pyspark.ml.regression import LinearRegression |
Train the model using Linear Regression:
1 | lr = LinearRegression(featuresCol = "features", labelCol = "PE") |
Coefficients:
1 | lr_model.coefficients |
Output:
1 | DenseVector([-1.9775, -0.2339, 0.0621, -0.1581]) |
Intercept:
1 | lr_model.intercept |
Output:
1 | 454.6092744523414 |
Root Mean Squared Error:
1 | lr_model.summary.rootMeanSquaredError |
Output:
1 | 4.557126016749488 |
Save the model:
1 | lr_model.save("lr1.model") |
Decision Tree Regression
Import some essential spark packages:
1 | from pyspark.ml.regression import DecisionTreeRegressor |
Split the dataset into train and test dataset:
1 | splits = vpp_df.randomSplit([0.7, 0.3]) |
Train the model and make the predictions using Decision Tree Regression:
1 | dt = DecisionTreeRegressor(featuresCol = "features", labelCol = "PE") |
Evaluate the result:
1 | dt_evaluator = RegressionEvaluator(labelCol = "PE", predictionCol = "prediction", metricName = "rmse") |
Output:
1 | 4.459494278528065 |
Gradient-boosted Tree Regression
Import some essential spark packages:
1 | from pyspark.ml.regression import GBTRegressor |
Train the model and make the predictions using Gradient-boosted Tree Regression:
1 | gbt = GBTRegressor(featuresCol = "features", labelCol = "PE") |
Evaluate the result:
1 | gbt_evaluator = RegressionEvaluator(labelCol = "PE", predictionCol = "prediction", metricName = "rmse") |
Output:
1 | 3.976988494544201 |
Understand Recommendation Systems
A common problem in machine learning is making recommendations. There’s two general ways of doing this. One is called Collaborative Filtering. Let’s imagine you run an online bookstore, and you have a number of customers. And these customers all like reading both the brown book and the red book. Now a new customer comes along and indicates that they really enjoyed reading the red book. What other books can we recommend to them? Definitely the brown book, since other people who have read the red book also enjoy reading the brown book. This is an example of collaborative filtering. Another way to make recommendations is based on the properties of the items that you’re working with. For example, if we have a customer who really enjoys readying Sci-fi, we might want to recommend other science fiction books to them, but not necessarily biographies. Spark MLlib supports Collaborative Filtering, and it works by filling in something known as the user-item matrix. So we can think of users as customers and items as books. In this example below, we have a customer who likes item one and two and item four. User number two, or customer number two, also likes item two and also likes item three. Now, we’ll notice that user four has something in common with both user one and user three. That means we probably want to recommend item two to user four. This is an example of collaborative filtering. This is the type of recommendation system that Spark MLlib supports.
Item1 | Item2 | Item3 | Item4 | |
---|---|---|---|---|
User1 | x | x | x | |
User1 | x | x | ||
User1 | x | |||
User1 | x | ? | x |
Collaborative Filtering
Collaborative filtering follows the same patterns we’ve used repeatedly in this post.
First we start with preprocessing. We’re going to use the alternating least squares method that’s provided by Spark MLlib, and, to use that, we just import the ALS code from pyspark.ml.recommendation package. And then we build a DataFrame using user-item ratings.
When it comes to modeling, we create an ALS object and, when we do that, we have to specify the user, the item, and the rating columns in our data frames. And then we train the model using fit and fit is part of the ALS project. And then when it’s time to evaluate, we create predictions using the transform of the ALS model and we apply that to our test data. We create a RegressionEvaluator object and we use the evaluate function of that RegressionEvaluator object to calculate the root mean squared error, and that’ll give us a measure of how well our collaborative filtering is making recommendations.
Tips for Using Spark MLlib
Let’s review some tips for working with Spark MLlib.
There are three basic stages of building machine learning models. There’s a pre-processing phase where we collect, reformat, and transform the data. And once we have that data, we can build our models using a variety of machine learning algorithms. And then we want to make sure we evaluate our data to assess the quality of the models we built.
With that framework in mind, let’s look at some tips to make each of these stages go smoothly. First, when we’re pre-processing, we want to first load our data into DataFrames. If you’re working with text files, it helps to have headers or column names in the text file. When you read a file, make sure you use the inferSchema = True option. That’ll make sure that things like dates and numeric values get mapped to their appropriate data type. Use the VectorAssembler to create feature vectors and the StringIndexer to map from strings to numeric indexes.
During the model building phase, make sure to split your data into training and test sets. We use the training data to fit our models and then the test data to apply transformations to create predictions. When we’re done building the model, we want to validate them. Using the MLlib evaluators is recommanded. The two that we looked at were the MulticlassClassificationEvaluator and the RegressionEvaluator. Just be sure to use the right one for the kind of algorithm you’re working with. Also, be sure to experiment with multiple algorithms. Once you’ve gone through the pre-processing phase, it’s very easy to test other algorithms so take advantage of that. Also, vary hyperparameters for the algorithms you’re working with. Sometimes you can get slightly better performance just by changing a hyperparameter.
Where do we go from here? Well first I’d recommend consulting the MLlib documentation. It’s really high quality documentation and it provides details on the APIs and includes extensive examples. When you’re ready to work with other data sets, look at the Kaggle website that has both machine learning data sets and articles about machine learning. Now Spark is designed for working with big data so if you’re ready to work with machine learning at big data scales, consult the AWS data sets. These are public data sets that are freely available from the AWS cloud service. Spark and MLlib are both under active development. So as you go forward working with MLlib, be sure to check back at the Spark MLlib website for updates and new features.