Run the Apache Spark Logistic Regression algorithm

By Gloria Harper,2015-05-23 01:07
40 views 0
Run the Apache Spark Logistic Regression algorithm

    Run the Apache Spark Logistic Regression algorithm

    This article is to introduce using machine learning algorithms, to introduce the Apache Spark data processing engine.We will first make a brief introduction of the Spark at the beginning, then we will start to practice an example of machine learning.We will use the Qualitative Bankruptcy data sets from UCI machine learning data warehouse.Although the Spark support Java at the same time, Scala, Python and R, in this tutorial we will use the Scala as a programming language.Don't worry you are not using Scala's experience.Each code of practice, we will explain in detail again.


    Apache Spark is an open source framework of cluster computing, applications can be written in the Spark than Hadoop graphs model of high speed of more than 100 times.One of the main characteristics of the Spark, based on memory, running speed, not only that, complex applications run on the Spark system, is more effective than graphs based on disk.Spark is aimed at a more general, so it provides the following library:

    ; The Spark SQL, processing of structured data module

    ; MLlib, extensible machine learning repository

    ; API GraphX, figure and figure of parallel computing

    ; Spark Streaming, scalable, fault-tolerant flow calculation program

    As already mentioned, the Spark support Java, Scala, Python and R programming language.It also integrates other big data tools.In particular, the Spark can be run on a Hadoop cluster, can access any data source, including Hadoop Cassandra.

    The Spark core concepts

    In a high abstract level, a Spark of application by a driver as the entrance, run on a cluster of parallel operation.Driver contains your application's main function, and then assign the application cluster member.Driver by SparkContext object to access computing cluster.For the interactive shell applications, SparkContext can be accessed by sc variables by default.

    The Spark is a very important concept of RDD - elastic distributed data sets.This is a collection of immutable objects.Each RDD will be divided into multiple partitions, each partition may be involved in calculation on different cluster nodes.RDD can contain any type of Java and Scala object, Python or R, including user-defined class.RDDS production there are two basic ways: by loading distribution of external data set or a collection of objects such as a list or set.

    After creating the RDDs, we can do it for RDDs 2 different types of operations:

    ; Transformations - conversion operations, from a RDD into another


    ; Actions - action operation, through the RDD calculation results

    RDDs calculation by means of lazy - when RDDs encounter Action operation, will begin to calculate.Spark the Transformations of the operation, will be accumulated into a chain, only when the need data, can perform these Transformations.Each time the RDD for Action operation, RDD will regenerate.If you want to make some calculation results can be in the middle of the Action of other operating reuse, so you need to call the Spark of RDD. Persist () to store the data in the middle.

    Spark supports a variety of operating mode, you can use the interactive Shell, or run a standalone Spark program alone.No matter which kind of way, you will have the following workflow:

    ; Input data, used to generate the RDD

    ; Using the Transformations conversion data set operation

    ; Let the Spark keep some intermediate results, calculation for reuse

    ; Use Action operation, let the Spark parallel computing.The Spark

    inside will automatically optimize and run computing tasks.

    Install the Apache Spark

    In order to start using the Spark, it is necessary to begin withwebsiteDownload.Select the "Pre - built for the Hadoop 2.4 and later" version and then click the "Direct Download".If it is a Windows user, recommendations will beSparkThere is no space in the name of the folder.For example, extract the files to: C: \ spark.

    As mentioned above, we will use the Scala programming language.Into the Spark installation path, run the following commands:

    // Linux and Mac users


    // Windows users

    bin\spark shell

    Then you can see the Scala in the console:


    QUALITATIVE classification of bankruptcy

    The problem can be used in real life of machine learning algorithm to predict.We will try to resolve, through the qualitative information of a company, to predict whether the company will go bankrupt.The data set can be downloaded from UCI machine learning repository at Spark in the installation folder, create a new folder named playground.Copy qualitative_bankruptcy. Data. TXT file to it.This will be our training data.

    Data set contains 250 instances, of which 143 examples for the bankruptcy, 107 examples of bankruptcy.

    Each instance data format is as follows:

    ; Industrial risk

    ; To manage risk

    ; Financial flexibility

    ; credibility

    ; competitiveness

    ; Management risk

    These are known as the qualitative parameters, because they can't be expressed as a number.Each parameter can take the following values:

    ; P positive

    ; A average

    ; N negative

    Data sets the last column is the classification of each instance: B for bankruptcy or NB not bankrupt.

    In view of the data set, we have to train a model, it can be used to classify new data as an example, this is a typical classification problem.

    The steps to solve the problem are as follows:

    ; From qualitative_bankruptcy. Data. TXT file to read data

    ; Parse each qualitative values, and transform it into double

    value.This is our classification algorithm is needed

    ; The data set is divided into training and test data set

    ; Using training data model

    ; Calculate the training error of test data


    We will use the Spark of logistic regression algorithm training classification model.If you want to know more the principle of logistic regression algorithm, you can read the tutorial at

    The Spark of Scala Shell and paste the following import statements: import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS,


    import org.apache.spark.mllib.regression.LabeledPoint

    import org.apache.spark.mllib.linalg.{Vector, Vectors}

    This will import the required libraries.

    Next we will create a Scala function, the data set of qualitative data into a Double value.Type or paste the following code and press enter, the Spark Scala Shell. def getDoubleValue( input:String ) : Double = {

     var result:Double = 0.0

     if (input == "P") result = 3.0

     if (input == "A") result = 2.0

     if (input == "N") result = 1.0

     if (input == "NB") result = 1.0

     if (input == "B") result = 0.0

     return result


    If all run there is no problem, you should see output like this:

    getDoubleValue: (input: String)Double

    Now, we can read qualitative_bankruptcy. Data. Data from TXT file.From the point of view of the Spark, it is a Transformation operations.In this phase, the data is actually not to be read into memory.As mentioned earlier, this is a lazy way of execution.The actual read operations are triggered by count (), it is an Action.

    val data = sc.textFile("playground/")


    With our val keyword declare a constant data.It is a contains all the input data lines of RDD.Read operation is SC or sparkcontext context variables to monitor.The count operation should return the following results:

    res0: Long = 250

    Now it is time to prepare data for logistic regression algorithm, converts a string to a numeric.

    val parsedData ={line =

     val parts = line.split(",")

     LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x


    Here, we declare the another constant, named parsedData.For each row in a data variable data, we will do the following:

    ; Use ", "split string, and obtain a vector, named parts

    ; Create and return a LabeledPoint object.Each LabeledPoint contains

    vector of labels and values.In our training data, tags or categories

    (bankruptcy or not bankruptcy) in the last column, the array

    subscript 0 to 6.This is the parts we use (6).Before save the label,

    we will use getDoubleValue () function converts a string to

    Double.The rest of the values were converted to Double type number,

    and stored in a data structure called dense vector.It is also a Spark

    of logistic regression algorithm required data structure.

    Spark support map () conversion operations, Action is executed, the first is to map ().

    Let's look at the data, we have prepared using take () :


    The above code, to tell the Spark out 10 samples from the parsedData array, and print to the console.Same, take () before operation, and will perform the map ().The output is as follows:

    res5: Array[org.apache.spark.mllib.regression.LabeledPoint] =

    Array((1.0,[3.0,3.0,2.0,2.0,2.0,3.0]), (1.0,[1.0,1.0,2.0,2.0,2.0,1.0]),

    (1.0,[2.0,2.0,2.0,2.0,2.0,2.0]), (1.0,[3.0,3.0,3.0,3.0,3.0,3.0]),

    (1.0,[1.0,1.0,3.0,3.0,3.0,1.0]), (1.0,[2.0,2.0,3.0,3.0,3.0,2.0]),

    (1.0,[3.0,3.0,2.0,3.0,3.0,3.0]), (1.0,[3.0,3.0,3.0,2.0,2.0,3.0]),

    (1.0,[3.0,3.0,2.0,3.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,2.0,3.0,3.0]))

    Then we divide the training data and testing data, will be divided into parsedData 60% of the training data, 40% were divided into test data.

    val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)

    val trainingData = splits(0)

    val testData = splits(1)

    Training data and test data can also be like above, use the take () the count () look at it.

    Exciting moment, we are now beginning to use Spark LogisticRegressioinWithLBFGS () to the training model.Set classification number, here is 2 (bankruptcy and the bankruptcy) : val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)

    When the model training, we can use the testData to examine error rate of the model. val labelAndPreds = { point =

     val prediction = model.predict(point.features)

     (point.label, prediction)

    val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testData.count

    Variable labelAndPreds save the map () conversion operations, map () to convert each row to binary group.Binary group contains the testData tag data (point label, classification data) and the classification of the predicted data (prediction).The model USES the point. The features as the input data.

    The last line of code, we use the filter () conversion operations and the count () action operation error rate to calculate the model.Filter (), keep forecast classification and the category inconsistent tuples.In Scala _1 and _2 can be used to access a tuple of the first element and the second element.Finally with forecast error divided by the number of testData the number of the training set, we can get the model error rate:

    trainErr: Double = 0.20430107526881722

    you have seen the Apache Spark can be used for machine learning tasks, such as logistic regression.Although this is not a distributed single environment of Scala shell demo, but the real strong Spark is distributed memory parallel processing ability.

    In the field of big data, the Spark is by far the most active open source project, in the past few years have been quick to gain attention and development.In the past few years.Interviewed more than 2100 respondents, all kinds of usage and environment.

Report this document

For any questions or suggestions please email