100 billion records later, refining our ETL service

We launched Stitch (then called RJMetrics Pipeline) into beta in October 2015, and since then we've processed over 100 billion records across tens of thousands of data streams. Earlier this month we took off the beta tag, released version 2 of our Redshift loader, and, last but not least, renamed our product and company to Stitch.

When we first launched I wrote an in-depth article about how the system works and some of the technical and product decisions that went into it. Many of those decisions turned out to be great, but some were dead wrong. Our system today looks dramatically different from the one I described in that article. Here’s what we learned that forced us to evolve:

Spark isn’t so simple

In our initial design, we planned to do some level of in-stream transformation, ranging from demultiplexing a mixed stream, to general data cleansing, to micro-batching, and — maybe down the line — user-defined transformations. In this design, horizontal scalability would be of primary importance; to meet increased load, we wanted to simply add machines. Apache Spark seemed to meet those needs: it supported stream-based processing of Kafka out of the box, and constructing a transformation pipeline was merely a matter of chaining functors together. So we went for it.

Fast-forward several months. We got past the initial roadblocks — keeping track of offsets, adjusting the streaming parameters for desired throughput, and writing our code to avoid closures. We entered production with only a few glitches, the worst being duplicate records being written to the output when batches had errors (fortunately, we were prepared to handle them downstream).

However, as load grew, we started to experience strange failure modes, most of which resulted in a growing backlog of unprocessed records. One cause for this behavior was that a Spark worker would stop running. The cluster would transparently take up the load, but each job would take longer to run because the cluster had less capacity. This happened with surprising frequency, and without any obvious cause — the only hint was a warning in the Spark worker log that indicated that the node had become disassociated. Moreover, even under normal conditions the system wasn’t highly performant, and when we profiled it we found that the bulk of time spent processing a batch was for coordination and data movement, most of which was unnecessary because our batches were small.

The final straw was one night where all of the workers shut down, claiming that they had an offset that was past the end of the Kafka topic. We looked at Kafka to see if there was a problem, and couldn’t find one. We restarted the workers and they started from the last saved offset — but by this time there was a half-billion-row backup, which took several days to work down.

So what was the alternative? Our needs were relatively straightforward: read a record from the input queue, apply a simple series of transformations to it, and write the resulting records to the appropriate output stream. It took us less than a week to develop and deploy a standalone replacement for Spark that utilizes Kafka’s consumer to manage association of topics to threads, and commits offsets once a series of transformation functions have been applied. The cost to operate this system has been a fraction of the cost of the Spark cluster we were using, and it has required minimal maintenance.

I’m sure we could have dug deeper into Spark streaming to optimize and resolve the issues we were seeing, and I understand it has matured significantly in recent releases, but I’m still totally confident that scrapping it was the right move for us at the time.

Kafka is great, but not at managing topics

Our original architecture included two sets of queues: data would first enter the “Fat Queue” interspersed with all other data streams, and then it would be demultiplexed into a standalone topic on the “Thin Queue” before getting loaded into Redshift. The multitier design was intended to optimize for throughput and availability at the input source, while isolating the effects of load and downtime at the output targets — the Redshift instances that we do not directly control. In other words, we wanted to make sure that one customer’s huge data stream or Redshift outage didn’t affect any other customers.

This worked well at first, but as our data volume increased, the Thin Queue Kafka cluster ended up with tens of thousands of topics. This caused two problems: we had to run more Kafka brokers to ensure we didn’t run out of file handles, and we spent a lot of processing time checking empty topics for new data. We found that many of the topics would be empty for long periods of time — because the data stream wasn’t changing, or a customer decided to not continue with a free trial — so we built a reaper to erase topics that had been idle for many days.

Unfortunately, we ran into a few cases where deleting topics caused the cluster to crash. And, even after we ran the reaper, the list of active topics was still long enough that we had trouble keeping up with it. We concluded that Kafka is not the right system for handling tens of thousands of low-volume topics, particularly when that list is growing and shrinking dynamically.

We have replaced the Kafka Thin Queues with a new system we have named “the Spool.” It buffers data into S3 in batches, and — since S3 doesn’t guarantee consistency — uses a transactional database to track the state of each batch file. Since S3 does all of the heavy lifting, this system is dramatically easier to maintain, and it’s cheaper to operate.

Views get a bad rap

Our initial release used database views as the primary interface for users to interact with their data. We stored data in tables in a hidden schema with not very user-friendly table and column names, and then put a nicely named view on top of it. This allowed us to store metadata in the table and column names with bookkeeping information, and, when necessary, rebuild tables in the background before transparently swapping them behind the views. It gave us a nice distinction between the private interface to the data and the public one for our customers to use, and we thought views would make the system more user friendly.

Our users disagreed; they had an almost visceral negative response to views. Some wanted to tweak the underlying data model by adding custom encodings and sort keys, which the views made more difficult. Others looked at the view definition and fainted when they saw the ugliness underneath. Some, it seemed, didn’t know why they disliked the views, but they did.

We conducted dozens of interviews to try to distill out what exactly the problem was with views, but we never got a conclusive answer. That didn’t change the reality — everyone hated them. So, rather than try to swim upstream, we decided to scrap them. With this launch, all data streams get loaded directly into tables. While we don’t have a concise conclusion about why people disliked views, the overarching theme was that our customers want direct control over their data — and the views were an unwelcome layer of separation.

Not all APIs are RESTful

Unlike many of the SaaS companies we integrate with, we don’t (currently) have a REST API for exporting data about our system. But we do offer an API, in the form of the data we deliver into our users’ data warehouses. In fact, this API is the core of our product. Like any responsible API developer, we can’t go around breaking or changing it without taking the appropriate precautions to ensure downstream systems don’t break.

This release makes a breaking change in this API. We’ve changed from views to tables, made some column-naming edge cases more user-friendly (no more hashes!), and added some niceties, such as automatic widening of VARCHAR fields. Applications built on top of the old version aren’t guaranteed to work with the new version, and the migration path between the two will require maintenance on the data warehouse and any downstream applications.

We will be assisting customers with this upgrade to ensure it’s done correctly and with minimal disruption. But we think providing guarantees for datasets with evolving structures is a widespread problem, and we’re working on making this easier and safer in the future.

Conclusion

We’ve learned a lot in the past few months. We’re going to continue focusing on making business data accessible, and you can look forward to new data integrations, new warehouse targets, and increased transparency in the near future. Stay tuned for future blog posts about the lessons we learn in pursuit of these goals, and give Stitch a try if you haven’t yet — it’s free!