eHarmony Engineering logo

Spark and Scala: a Compatible Match

January 7, 2016

At eHarmony, a critical piece of the engineering puzzle is finding the right tool for the job. Recently we rewrote part of our match notification email system (which delivers the profiles of new matches to our users). We implemented a set of tools to simplify the system and make it more efficient, utilizing Apache Spark and the Scala programming language.

blog_bporter

About Apache Spark

Apache Spark (http://spark.apache.org/) is a general-purpose engine for processing data; it’s very fast and scales extremely well. Spark can run standalone, in a Hadoop environment, on Mesos, or in the Cloud. Data can be accessed on HDFS, Hive, HBase, any Hadoop data source, or Cassandra.  It provides simple functional concepts for dealing with data, and excels when dealing with large-scale data processing. Spark is able to utilize “in-memory” data efficiently, and typically performs much better than an equivalent MapReduce process. Spark has a vibrant community, and comes with Streaming, SQL (DataFrames), Machine Learning, and Graph components.

Key Concept: RDD

The key data abstraction in Spark is called an RDD. This stands for: “Resilient Distributed Data set”. These RDDs are operated on and transformed in any Spark computation. The data is input, typically via a file or set of files, and is operated on. The output is directed to other files, applications, jobs, etc.

Spark provides some special functions for RDDs that are essentially Key/Value pairs, and enable joins with other Key/Value data sets, as well as subtracting, grouping, and reducing by Key.

These features proved particularly useful in the data-gathering phase for our user-match generated emails.

About Scala

Scala (http://www.scala-lang.org/) is a JVM language that is strongly typed and supports a mixture of object-oriented and functional paradigms. While many features of Scala require a lot of experience with the language, a few simple features make it a nice complement to Spark. In fact, it is the recommended language for Spark, and actually comprises the largest component of the Spark implementation code base.  Check out the Scala tutorial for Java users: http://docs.scala-lang.org/tutorials/scala-for-java-programmers.html.

Key Concept: Tuple

As mentioned above, the Key/Value structure is important in dealing with multiple Spark data sets. Scala has a primitive concept called a Tuple that models this nicely. Tuples are designated by a simple syntax: (x, y). There is no keyword, no “new”, or anything of the sort. Simply wrapping values or expressions in parentheses is enough to designate a Tuple.

Scala uses built-in methods on the Tuple that are numbered, as accessors to get the data (._1, ._2, ._3, …). These features are very convenient for returning multiple values, or just associating two (or more) objects together. A common type of RDD with Tuples would therefore look something like:

val myData: RDD[(Int, String)] = …

This indicates that myData is an RDD containing Keys of type Int and Values of type String. If the RDD contained only String data, as a multi-line file might, then the type would simply be RDD[String].

Match

Pattern matching is another powerful Scala feature that proves useful in Spark applications. Pattern matching allows Scala to conveniently deconstruct a piece of data (either from a Tuple, type names, or from special classes known as “case classes”).

For instance, if we wanted to do something with the myData RDD mentioned above, we could work with it like this:

    myData.map(x -> (doSomethingWithIntValue(x._1), doSomethingWithStringValue(x._2))

This utilizes the built-in accessor methods.

On more complicated structures, this can become cumbersome. Matching syntax deconstructs and names the components:

    myData.map({ case (k,v) => (doSomethingWithIntValue(k), doSomethingWithStringValue(v))})

While this example looks more complicated, consider a Tuple that looks like:

(key, (val1, (data1, data2)))

Using Tuple accessors it would look something like:

    myData.map(x -> (f(x._1), g(x._2._1), h(x._2._2._1), j(x._2._2._2)))

Hmm, how about the other way?

        myData.map({ case (k, (v, (d1, d2))) => (f(k), g(v), h(d1), j(d2))})

It is easy to imagine how the the pattern matching approach can make the data much clearer.

Join

Here’s a practical example of how these two technologies benefited our match email system rewrite.

Consider a system that produces the following outputs: one user mapped to their unique set of new matches.

We can use an Integer (Scala Int) id for the user, and some MatchData as the data for the matches.

We will likely also need some information about the individual users, in addition to their matches, to generate and send an appropriate email. This user data can be stored as user id (Int) to UserData. We can then use the power of Spark to join the UserData and MatchData together, giving a more complete view of a particular user. The resulting RDD type would be:

Scala
val userData: RDD[(Int, UserData)] = sc.objectFile(…)
val matches: RDD[(Int, MatchData)] = sc.objectFile(…)
val joinedData: RDD[(Int, (UserData, MatchData))] = userData.join(matches)

Double join

As seen above, joining individual users with their matches is accomplished with a simple method join.

As the match data includes other users whose data is needed for further processing, we obtain the data by joining the users against the same UserData data set.

However, as discussed, Spark requires Tuples with matching keys to do this properly. In this case, the keys are the user id, entailing one simple step. The “joinedData” has the match data available in the second element of its structure, and the matched users ids are a part of that data. In fact, it’s the second element of the second element. In Scala’s Tuple syntax, the reference is ._2._2 .

We can produce a new RDD that is match-oriented rather than user-oriented by a simple transformation of our joined data.

Scala
val matchOriented: RDD[(Int, (MatchData, UserData, Int))] = …

// Note here that the second element data will be redundant for more than one match –
//  this is temporary…

val matchOriented: RDD[(Int, (MatchData, UserData, Int))] =
       joinedData.flatMap {
case (userId, (userdata, matchdata)) =>
    matchdata.getMatchedUsers.map( /*foreach matched user */
       user => (user.userId, (matchdata, userdata, userId)))}

The flatMap method is necessary, as the inner map method potentially produces multiple results for a single data element. flatMap essentially expands the resultant RDD size to the size of each map summed, instead of an RDD containing Lists of the map outputs.

In the end, our data set includes matched user, the match data, and the data for the original user they were matched to.

Now, we simply join this to the user data as we did before:

Scala
val matchedUserData: RDD[(Int, (UserData, (MatchData, UserData, Int)))] = userData.join(matchOriented)

Using the Data

Note the type of this data set is getting quite verbose – it should be reduced now 😉 .

Since we are likely going to operate on the original user data, we would like to have a data set that is keyed by the original user. Thus, we perform another mapping transformation:

Scala
val denormUserMatches: RDD[(Int, (UserData, MatchData, UserData)] =
matchedUserData.map(x => (x._2._2._3, (x._2._2._2, x._2._2._1, x._2._1)))
// or with match syntax
val denormUserMatches: RDD[(Int, (UserData, MatchData, UserData)] =
     matchedUserData.map({ case (matchId, (matchUserData, (matchData, userData, userId)))  =>
         (userId, (userData, matchData, matchUserData))})

Simple, right?!  The matched user id was discarded in this transformation. This was done for brevity, as it’s typically available in the retained UserData. The last thing we need before performing a magic “generate and send email” function across the data set is to aggregate the user.

Remember that we started with some user data, and some match data for the user. Since a user can receive multiple matches in a day, the data set ultimately has many entries for a single user.

These need to be grouped or aggregated so that our final RDD looks like:

RDD[(Int, (UserData, List[(MatchData, UserData)])]

This is the same as before, except we pull the MatchData and matched user data pairs together into a List. Again, Spark comes to the rescue with an RDD method called aggregateByKey.

val aggregatedUsers: RDD[(Int, (UserData, List[(MatchData, UserData)]))] =
 denormUserMatches.aggregateByKey((UserData(), List[(MatchData, UserData)]()))(
    (x, y) => (y._1, y._2 :: x._2), (x, y) => (x._1, x._2 ::: y._2))

This leaves us with a result that contains all the user data we may need, along with the corresponding matches and their data.

We now have a data set with enough information to do the necessary processing for email generation (or any other user function we might want perform.)

Conclusion

In isolation, some of the syntax and concepts shown here can look complicated. However, with only a little exposure, they become very natural. Scala allows the Spark concepts to be coded almost as you would speak them: “join this with that”, “restructure that data so that the other piece is first”, or “pull these pieces together into a list.”

These single line joins replace the typically complicated loops, lookups, and data collection of traditional systems. Generally, we talk about performing transformations on entire sets of data. Spark enables us to transform data more naturally, without the distracting loop iterations typically required to handle individual elements.