eHarmony Engineering logo

Lambda Architecture for Fault Tolerant Applications

May 1, 2016

The most heavily visited page on is the My Matches page, where users go to find out about the people we’ve identified as their best matches. The System which supports this use case is very critical for the business. In this blog post, I’ll discuss the architecture of this system… Vijay Vangupandu

In the background, all the match information is handled by our match data service, which provides each user’s unique individual match feed with display-ready data about their matches. The data about each match must be constantly updated with new photos and any other new data the match has provided. The matches are generated by our matching system every day, based on fundamental personality compatibility as well as user preferences, such as height and distance. The match data service is therefore crucial to our business. However, our existing system was not scaling to accommodate increased traffic. We therefore recently implemented a complete system redesign. The redesigned system needed to be fault tolerant, and offer the best of both worlds, supporting the competing workloads:

  • batch processing
  • fast online updates

This Blog covers design aspects and use cases of the new match data service.


The primary use case of the redesigned match data system is to serve aggregated data about their matches to a user, and to maintain the life-cycle of the matches. The match data service design stores the aggregated data from various services, and synchronizes the changes using events from core services. The redesigned system has to support:

  • a batch system that processes the daily match events generated by the matching system, and handles user-generated events like photo uploads, name and address changes, or preference resets.
  • low latency queries and updates from online users, while handling a huge load of offline events processing, with high availability and consistency.

Lambda architecture from “Nathan Marz” satisfies all these requirements and also helps in breaking the CAP theorem.

Batch System

The Batch system is responsible for processing the stream of events from various origin systems, and keeping the matches feed in sync. This system is designed to handle the burst of events from matching system during our nightly matching run. The system keeps the aggregated feed stored in the batch system in sync with the data in the origin servers by listening to events and updating the store with changes. For example, propagating user address changes, profile changes, or new photos into all of a user’s matches. The batch systems consume events from Kafka in windows, and perform batch updates by user. Events are stored in Kafka for 4 days; should any failures occur, we simply replay the events. In effect, Kafka keeps events as immutable logs, and the batch system processes the logs to update the storage.

Batch Storage

For batch storage, we explored multiple no-sql solutions, finally selecting HBase as immediate consistency is a must, and we were able to apply filters at store level, on top of handling the big data set. We chose Apache Phoenix as query server on top of HBase, and we built an ORM library PHO to interact with HBase using Apache Phoenix. You can find the open source project PHO in our GitHub repository.

Speed Layer

The Speed layer is responsible for handling user updates in the online system, such as sending communication to other users, closing or archiving matches, and changing the state of the match. These changes must be propagated to other user feeds, along with the logged-in user feed. We designed the speed layer so that updates are inserted into speed storage immediately, and the event is sent to the batch system for eventual synchronization with batch storage. Speed storage basically stores delta for past “X” hours; at the moment, we keep 2 hours worth of delta, but most of the time our batch system processes the deltas within a few minutes. This way user updates are available immediately for the query layer, even when our batch system is handling huge loads during our match generation phase, and takes a long time to sync these changes with batch storage. We chose Redis for speed layer. It’s an in-memory store, and is very responsive, as all operations are in-memory. On top of that, it provides a rich set of data structures, and we can auto-expire the entries after 2 hours by setting a ttl. We use a hash data structure to store the user match delta, with user-id as key, and the set of matches with match-id as key, to match objects for quick look up during the merge phase in the query layer. We also enabled “Snapshot” to provide file and replication to slaves, in order to handle any Redis node failures (as we did not implement the clustered version of the Redis due to it’s immaturity at the time of this implementation).

Query Layer

The Query layer is responsible for serving user requests, such as:

  • serving the match feed to users to render their matches page
  • serving a match to render the match profile page to the user
  • displaying various counts such as “New”, “Communication” and “Archived”

The Query layer is designed to serve low latency queries. Unlike the batch system, both the speed layer and query layers are designed for low latency. We achieve low latency by parallelizing the requests using reactive programing (RxJava) to both speed and batch layers, and merging the deltas from speed storage to results from batch storage. Fetching data from the speed storage in Redis is very quick, as it is in-memory, however it can take a long time to fetch results from the batch layer especially if the data set is larger; some users have thousands of matches in batch storage (i.e. a few MBs of memory each). To further optimize, we even parallelized calls to batch storage by match state, and by various other means. Once we fetch data from both the speed store and the batch layer, we apply a merge strategy to merge both results and produce a final result set to return to the user. Delta merging is tricky; sometimes it is as simple as merging by time stamp, and sometimes it is as complicated as merging various sections of the feed items (profile, photos, match etc). At this point we don’t have a requirement to perform merging at field level, but we do have plans to implement it. Finally, as the layer name suggests, we use lot of queries and filters. We chose Apache Phoenix to abstract this layer; as mentioned in the speed layer section, we also implemented open source ORM library on top of Phoenix.

Message Broker

The Message broker is key to Lambda architecture; this is where the actual decoupling happens. The message broker helps to bridge the gap between speed and batch layers. We chose Kafka  as message broker, as it is highly available with replication factor 3 on a 3 node cluster. We keep logs in Kafka for 4 days, and use it as immutable data set for the batch layer. As mentioned in the batch layer section, in case of failure, we replay the events to recalculate the feed. LAMBDA-ARCH-DIAGRAM

Fault tolerance Scenarios:

      1. Temporary Failures to the batch layer will not cause total outage of the site, as we can still operate the site with deltas from the speed layer and all the updates from online user activity are queued up in Kafka for 4 days.
      2. Temporary outage to speed storage has only have few consistency issues (as data is synced up with batch storage eventually, using events through Kafka).
      3. Kafka can tolerate up to “N-1” nodes failures with replication factor “N”. We have 3 nodes with 3 replication factor; even if Kafka is completely down for a brief period, our event service logs can be resent.