At Distil we use a variety of detection mechanisms to find bad bots. This ranges from heuristics developed from the many years of combined experience of our security experts, to explicit bad behaviors — like high-rate requests and suspicious data center traffic, to implicit bad behaviors which we learn from our global traffic. This latter category — implicit bad behavior detection — is done by our Machine Learning feature. Recently, we undertook a large engineering effort to rebuild the infrastructure of this product to make it faster, easier to maintain, and more useful to our data science team for quickly iterating on new ideas and testing detection performance.
Our old machine learning system was initially developed as an extension of the initial research, where we used batch systems to look through our logs to find signals for identifying our bad actors. Most of the heavy-duty data-crunching during initial research was handled by Hive and Impala and was done in AWS EMR. Hence, it made sense to expand upon this to build our production system.
Our logs were archived to AWS S3 and we were able to allow Data Science to easily create a cluster of worker machines so that the prototype system could be built with DS resources alone instead of requiring external engineering support. This was both a blessing and a curse. While we try and maintain high engineering standards in our Data Science group, there is a certain amount of technical debt you’re bound to inherit when having researchers build production systems on a tight timeframe.
Image 1 - Legacy Machine Learning System
The system was a hybrid of Python and Impala (http://impala.apache.org/). Python orchestrated the batch loading of new logs from S3 and the creation and execution of Impala SQL queries. These SQL queries would load the new data and compute the updated features for all of the users on our global network. This was a very computationally intensive task that required computing many features for 10’s of millions of users each iteration.
After the new features were computed and stored in HDFS the user:feature pairs were streamed to a classifier process using Hadoop streaming. This allowed us to preserve our classifiers developed during research without having to reimplement and test them using a different language. The classification process was fairly performant thanks to the powerful Python based numerical computing tools like Numpy.
Finally, classified user’s output probabilities were stored in HDFS and then sent out to S3 for other systems to ingest and act upon the new user scores.
Problems with the Legacy System
While this system served us well for testing and evaluating the effectiveness of our Machine Learning product, there were several disadvantages to this approach.
- The system was custom built for a specific classification process—adding in new features, changing the models, or otherwise modifying the setup was a non-trivial endeavor.
- The system required direct engineering input from the Data Science group to maintain, modify, and support the system, which was problematic for scaling our processes and offloading system support to our OPS and Data Engineering teams.
- The system had a high latency-to-reaction due to the large batch size and serial nature of processes.
New System Requirements
Given the limitations of the Legacy System we decided it was time to rebuild with the following requirements:
- Faster latency to better react to user and bot behaviors.
- Scaleable so that we could focus on research rather than constantly monitoring the load on the system. We also wanted to anticipate future business growth and build the system in a way where we didn’t need to review its architecture for a while.
- Move as much of the Enterprise software engineering outside of the scope of responsibilities for our Data Science team. The needs of writing and maintaining that kind of software wasn’t feasible for the team in addition the daily tasks of Data Science analysis and R&D. Almost of all of the DS research, prototyping and analysis work is done in Python while the rest of the engineering teams use a variety of languages based on the system performance needs.
- Having well defined, but flexible, interfaces between the Data Science and non-Data Science portions of the system was also a critical part of reducing the engineering needs of the Data Science team and letting the system be able to interface different sub-systems in potentially different languages.
Building the New System
In order to satisfy the requirements listed above we decided on three main technologies to add to our existing stack: Storm for distributed and streaming computation, Kafka for distributed message delivery, and Protocol Buffers for enforcing datatypes and message structure. We also used Redis for aggregating user-state (classification features) - however Redis was already used for this purpose in our network appliance processing systems, but we expanded its use for Distil Machine Learning.
The protocol buffers (https://developers.google.com/protocol-buffers/) allow us to move logs from the network edge nodes where our customer’s traffic is handled, to our data center and then into the data warehouse and data science processing infrastructure, all without having to worry about enforcing datatypes or message field structures at every junction. Protocol buffers also have the advantage of having code generation ability for Python, Java and Go — the three main languages used in our logging / classification pipeline. We maintain a central repository for our log protocol buffer definitions and all teams exchanging these message simply check-out and compile the current definition of the message structure. Protobufs also are backwards compatible and allow for optional fields — so adding new fields to the message structure won’t break downstream processing.
The second major addition to our stack was Kafka (https://kafka.apache.org/), which we use as a method to move logs or messages in a distributed, fault tolerant, and recoverable way. Customer logs are utilized for many purposes at Distil Networks including:
- Customer reporting in the Portal
- Support investigations
- Analysis by our Professional Services team (https://www.distilnetworks.com/analyst-managed-service/)
- R&D by the Data Science team
As such, it’s important to have a system that allows for many writers (our globally distributed customer’s servers) and many readers (data warehousing, reporting, stream processing like Distil Machine Learning). Essentially, among other things, Kafka provides an interface between the Storm Machine Learning processing system that Data Science maintains and the streaming log aggregation services that our Data Engineering team provides.
Image 2 - System Framework for Separating ML Engineering / Data Science from Data Engineering
|Very high throughput|
|Cross language support|
|Multiple producers and consumers
Data replication and fault tolerance
Storm / Streamparse
The third major addition to our stack was Apache Storm (http://storm.apache.org/), primarily utilizing the Streamparse (https://github.com/Parsely/streamparse/) project from Parse.ly (https://www.parsely.com/). Storm allows the Data Science team to consume both log messages and any aggregations provided by Data Engineering to create a distributed graph of actions on this data. This is how we provide the Distil Machine Learning service. We read aggregated features about users on the network and pass them to Storm bolts for classification and scoring. The scores are then produced back to a new Kafka topic, which is then returned to the Edge Nodes for action. This process can be thought of as an idempotent scoring mechanism provided by Data Science — aggregated information enters the Storm cluster, and threat scores leave the cluster. Having defined interfaces at the entrance and exit of the cluster frees Data Science to quickly iterate on new ideas without having to explicitly work with the broader engineering group to create new systems. We were able to replace the Python-based orchestration and the Impala-based feature aggregation of the legacy system with Storm and explicit feature engineering by the Data Engineering team.
Image 3 - Machine Learning in Storm System Diagram
The Data Science group at Distil primarily works with Python, which is why we chose Streamparse to interface with Storm. We are able to directly load our Python-based machine learning classifiers into our Storm bolts and score users in the same way we conduct R&D. We also use Data Dog for recording application metrics and monitoring some algorithm performance metrics.
|Multi language Support|
|Generic distributed streaming compute|
Image 4 - Example of Data Dog application metric dashboard for the Distil ML product
Finally, in order to transform the high-rate streaming weblogs into user aggregations (e.g. turn web logs into a streaming list of users and how many IP’s they’ve used in the last 4 hours) our Data Engineering team uses Java and Redis for real-time state storage and computation. We decided that this would be done external to Storm for the time-being, as this service is provided by Data Engineering at a much higher throughput (100x) than the user classification scoring needs to be done.
With the combination of these systems, fault recovery becomes a much less painful process than the previous system. Each Kafka interface or topic has its own separate retention time. If any piece of the system fails logs, are still stored upstream of that sub-system for a period of time. The high throughput of Kafka and Redis allows us to back-process data if necessary. Finally, Storm monitors its processes and restarts them on a different compute node in the event of a failure. The image below shows what happens when Storm detected a failed hypervisor and shifted the Machine Learning computations to another bolt. What would have resulted in a major downtime in the previous system caused less than a minute of downtime in the current system:
Image 5 - Uh oh … there goes a hypervisor. Storm to the rescue!
The color change in the image above indicates where processing was moved from the dead Storm supervisor to working supervisors. The red bars show automatic rebalancing of the kafka consumers to compensate for the loss of processing.
Benefits to Customers
While engineering better systems is certainly personally satisfying to us as engineers, ultimately the best benefit is to our customers. This new system benefits them in several ways:
- We’re able to provide faster decisions about our users. We have cut down the judgement latency significantly, so that from the time the logs arrive in the data center, the Machine Learning system is scoring the user within a few seconds.
- We’ve increased the stability and uptime of the system by using scalable systems designed to grow and expand with our data volume.
- We’ve made it much easier for our Data Science team to quickly test out and iterate on new ideas and algorithms thereby decreasing the time it takes to move from research to development to production.
In summary, we decided it was time to upgrade the Distil Machine Learning infrastructure from an AWS EMR based monolith to a distributed Kafka and Storm based computing system in our own data center. In the process of doing this, we also re-engineered our log processing pipelines so that Kafka was used throughout the system for exchanging protocol buffer-based messages. Data Science’s primary responsibility shifted to a Storm / Streamparse based compute cluster and the high-throughput aggregation and user-state computations (features, for those in the Machine Learning world) were performed by our Data Engineering team. In doing so we’ve increased uptime and reliability, reduced latency, and increased flexibility for future R&D.
About the AuthorFollow on Twitter More Content by William Cox