Boundary: not just for clusters

Posted by on August 8th, 2012

At Boundary we talk a lot about visualizing distributed systems: Hadoop clusters, Riak nodes, and load balancers. But even small, single-node apps can create patterns in the network–patterns which can help you track down critical bugs.

Last night, an internal Postgres server locked up hard. Our monitoring systems detected the fault and paged our on-call staff, who discovered that the vacuum process was fighting to lock a table which was under heavy query load. In fact, query times had been rising linearly since the previous afternoon–and at a critical point, the database stopped responding altogether. When did the rise in query times start? Well, right after I deployed a new version of Joanna, a little internal webapp which maintains some cheap historical data in Postgres.

The key assumption here is “cheap”. I assumed that because the data involved was small, and because it passed load tests on my laptop, that Joanna’s DB use wouldn’t be a dominant factor in load. Because I assumed it was small, I didn’t think carefully about the algorithmic complexity of its query patterns. Could those queries actually be a problem? I opened Boundary, cautiously restarted Joanna, and obliterated those assumptions:

A solid 5 Mbps of traffic to Postgres. This is Joanna checking whether it has data for each (org, time) pair by querying the DB for a list of orgs for every time slice separately. O(n*m). But wait! There’s more!

I didn’t know Postgres could do 150 Mbps. After acquiring a block of data for a given time, Joanna would write a record for each org–obtained by an O(n*m) process again.

In ten minutes I adjusted the queries to run in O(n) time–and took the time to streamline some other operations as well. Those changes cut traffic volume by 2+ orders of magnitude, to 500 Kbps

Our slow query log didn’t reveal the problem because these queries (individually) are fast. It’s the aggregate behavior that caused a problem. Similarly, our Munin graphs for the DB node revealed the high query load, but couldn’t tell us what host it was coming from–or show anything about the short-lived traffic bursts. Identifying which part of the application caused each part of the traffic pattern required high-resolution time series.

That’s not to say that Boundary is suitable for all kinds of database problems. It can’t tell you about expensive but small queries, because their load is only evident inside Postgres. It also can’t visualize the internal behavior of an application on a single node–only its network I/O. But the instant you put the database and application on separate servers, you get a new view into the system as a whole. Together with logging aggregators like Splunk, and application-level metrics like New Relic, it’s a powerful exploratory tool which allows you to quickly test hypotheses about an app with instant feedback–from massive clusters to little webapps.


Streaming Failure Post-Mortem 7/31

Posted by on August 1st, 2012

From approximately 8:00 AM PDT to 2:00 PM PDT Boundary experienced intermittent downtime in the streaming system that resulted in the inability to accept data from customer meters and blank dashboards. The high level overview is that a sharp spike in ingress customer data combined with a suboptimal load balancing strategy resulted in a set of cascading failures that proved extremely difficult from which to recover.

At around 8:00 AM PDT we noticed a sharp uptick in customer traffic, approximately 60mbps worth all added at once. We use global server load balancing (GSLB) as the primary load balancing / disaster recovery facility for meters (customer installed agents) connecting to our collectors, the main point for ingestion of customer data. Unfortunately, when a large number of meters come up all at once DNS caching on the customer’s network will ensure that all of these meters connect to the same collector machine. In this particular case, that one collector was running near its capacity and the extra load was enough to knock it offline.

The resulting spillover load when combined with the uneven load balancing resulted in a thundering herd situation. Collectors would start, operate for a few minutes, then buckle under the load.

Meanwhile, the instability and occasional unresponsiveness of the collectors was causing Phloem, our Kafka powered messaging tier, to start having problems. The Phloems were doing blocking connection attempts in their main data consumption threads. When the collectors started becoming unresponsive, these connection attempts starved out the data consumption thread pool. This backed up the high volume of data waiting to be processed in the Phloem instance, causing OOM exceptions and necessitated killing the instances without a clean shutdown.

On restart, the Phloems brought up Kafka in recovery mode which means a scan of all files on disk. Due to our particular use of Kafka with many topics this expensive recovery operation caused restart times take 20 to 60 minutes. At no time were all Phloems down and thanks to Ordasity load shifted as need be. However, due to collector connection timeouts in the work claiming cycle load shifted much slower than normal also causing occasional downtime for some clients.

Specific Fixes

We implemented some bandaids to get the system back under control immediately. A set of HA Proxy instances were deployed in front of the collectors, providing a more even balance of load across the cluster. We also cleaned up a number of obsolete topics in our Kafka instances, helping with the restart time. Moving the connection code out of the main data consumption path, tuning timeouts and retry rates were all done in order to help with robustness in the Phloems while the collectors recovered.

Longer term fixes include implementing more intelligence around connection logic in the meter. The meter should do some form of back-off when the collectors do not respond, so as to prevent the thundering herd issues seen when collector recovering happens. The meters should also be able to intelligently assign themselves to a collector such that a single customer’s traffic does not all end up on a single collector. This may take the form of consistent hashing against the DNS records for our collectors.

Internally, in the interaction between the collectors and Phloem we need to completely separate data plane from control plane. Separating the control plane and data plane is a standard practice for many high throughput systems and it is an architecture that we plan on moving towards for our internal message flow.


Riak / Kobayashi Post-Mortem 7/31

Posted by on August 1st, 2012

From approximately 12:50 AM to 6:00 AM PDT on July 31, Boundary experienced intermittent downtime of streaming and historical systems stemming from riak cluster instability. The tl;dr is that a configuration change intended to allow our riak nodes more breathing room with regard to open file limits caused intermittent ulimit kills. This partial failure cascaded to our streaming system and exposed a riak bug in which a cluster can become fragmented.

We use eleveldb as the storage backend in the riak cluster behind Kobayashi (our system for collecting and analyzing historical metrics). It is common for us to adjust the configuration of this cluster to better suit our workload. At around 12:30 AM PDT we rolled out a configuration change to the eleveldb backend upping its open file limit. The change was rolled out to one node at a time on our 9 node cluster. Immediately following this change we noticed 99th percentile get and put latencies jump a little, but have found this to be common following rolling restarts of the cluster. Soon thereafter we saw the first sign that there might be trouble–the riak-java-client within kobayashi (our application layer over riak for historical analytics) began intermittently losing connections to riak on localhost. At this time, latencies began peaking at 60 seconds for both gets and puts–the maximum allowed by riak_kv_put_fsm and riak_kv_get_fsm.

To make matters worse, this is around the same time we saw indications that riak cluster instability had cascaded, causing partial failures in the streaming system. A bit of a back-story in how our streaming and historical systems interoperate is helpful in understanding this failure mode. Kobayashi persists historical network metrics in riak in blocks of 10 seconds, 10 minutes, and 10 hours for second-resolution, minute-resolution, and hour-resolution metrics, respectively. Upon taking on a particular organization as a unit of work, the streaming system loads state–the last block written for each query–so that we can begin writing into the last partially complete block. Since the streaming system was unable to load this state from kobayashi/riak during normal handoff operations, some orgs failed to start up and process data. The result of this failure is a dashboard which fails to show streaming metrics. Furthermore, writes to kobayashi from the streaming system during this time were timing out causing bounded queues to back up and begin rejecting work. The end result of this additional failure mode is loss of some writes of historical data for many customers even if the dashboard appeared fully functional.

In the mean time we began digging deeper to get details on the overall health of the riak cluster and anything we could do to ameliorate the pain we were feeling. We noticed two things immediately:

  1. There were a large number of handoffs/fallbacks pending as per
    `riak-admin transfers`.
  2. Cluster members did not agree on vnode ownership as indicated by `riak-admin ringready`.

As for the first item, we have seen pending handoff operations coincide with similar latency issues in the past. Unfortunately, the only solution has been to wait. However, we were troubled at the lack of a discernible downward trend in pending handoff operations over time. This fact combined with the split-brain that had occurred (point #2) led us to question whether we should revert the configuration change that preceded this failure or whether cluster instability was simply a snowball effect initiated by the rolling restart.

Further investigation into the riak logs showed that the parent process was periodically being killed due to ulimit issues. The configuration change we made allowing eleveldb more open files permitted the erlang vm as a whole to attempt opening more file descriptors than allowed by ulimit -n. This was made worse by vnode multiplexing and handoff-induced vnode motion around the cluster. Reverting the eleveldb change allowed the cluster to eventually stabilize.

However, riak was still fragmented as one node of the nine had formed a ring of its own. This turned out to be caused by a race condition in riak itself in which it becomes possible for a cluster member to leave and form a ring of one. While the window of time in which this bug may be exposed is exceptionally small, it is a condition that arises on node startup. Because nodes were dying and coming back online somewhat regularly, we had a greater probability of hitting that race condition–which we did. The folks at Basho were great and helped us troubleshoot and rejoin this orphaned node to the rest of the cluster.

There are some lessons to be learned here. In addition, we can extrapolate from these experiences about tooling that would have helped us to recover from this problem more quickly or prevented it entirely.

Our primary method of solving this problem was grepping through log files and using riak administration tools to get instantaneous snapshots of cluster state. A helpful tool for determining handoff activity contributing to cluster instability is `riak-admin transfers`. While this is useful, it doesn’t give the whole picture–we have to dig through log files for clues. Worse still, this particular tool blocks the very handoff operations being monitored so it must be used sparingly.

With a large number of machines and applications in play, it can be difficult to find important, low-frequency log events in the midst of unimportant, high-frequency ones. Tools that help consolidate and visualize logs emitted by distributed systems would potentially be of great help in these situations and we will continue looking into them.

Furthermore, we will take greater care to roll out configuration changes like this in the future. While we did take our time during the rolling update, we now have a better idea of how long to wait between riak restarts and how to evaluate whether a change was successful.

The good news is that most data was still captured during this time and only a subset of customers were affected by blank dashboards. But events like this remind us of the importance of resilient, operations-friendly engineering and help us to build better systems. While the source of this incident was a seemingly minor configuration change, the results had unexpected, cascading impact. We are forced to reason through these failure scenarios and this makes us better-equipped to accommodate partial failures in the future. Our mental model of how our infrastructure fits together, and how it breaks, is enhanced. We will continue using this experience to the benefit of everyone who uses Boundary.


Whoa… now THAT’S cool!

Posted by on July 20th, 2012

Hello everyone,

I’ve got a couple of updates this week of July 15th and some very cool shots of Application Visualization:

  • Application Visualization Update
  • Windows Meter Beta (available now)
  • Boundary on Google Compute Engine (beta)

Application Visualization
Application Visualization (AppVis) is really coming along – thanks to your feedback.

Check out AppVis on my big data cluster (which I lovingly call Large Marge*).  Now – my cluster spans multiple availability zones (for redundancy) and the processes that load data into the cluster are both in Rackspace and our own internal network.

With AppVis, you can view the communication flows of your application at the 30,000 foot level.

Collapsed View of Big Data App

Then, you can drill into the big data cluster portion and show that service “expanded out”.   You completely control the level of granularity you get into, e.g. my cluster is only 3 workers, but if you had a 200 node cluster, you could have an intermediary expanded level that groups nodes by what racks they’re in (or by availability zones, or whatever).   You can quickly see that this allows you to view at the highest level, then drill into the next level of detail, then drill into a further degree of detail, and so on.
Expanded Big Data Application

When you drill into the detailed traffic of the cluster, you can see the one way direction of replication happening between nodes as each node copies data to its redundant copy on a separate node.

Not only that, but now you can also click on the links to see WHAT KIND of traffic is traversing – check this out:

AppVis Traffic View

Note: that The diagram is refreshed with new data every time you refresh the browser.  Yes, right now, you have to manually refresh the browser to get it to upload new data, but by Monday, we’ll have continuous discovery deployed so that the visualization is updated with changes in traffic flows every few seconds (automatically).

Because my big data application spans multiple availability zones and different data centers, I can also visualize the application from a “location” perspective:
Regions Visualization

If you haven’t asked to have this feature enabled on your account, drop us a note at service@boundary.com and we’ll turn it on for you.  This has never been possible in public cloud environments - if you’re running your application in the public cloud – you’ve gotta try it!

Windows Beta Support (Available Now!)

The Windows Beta is in full swing – have several customers running the meter and having a great experience.  If you are interested in trying the Boundary Meter on Windows, please let us know at service@boundary.com.  We expect it to be generally available (GA) within the next three weeks.

Boundary now available in Google Compute Engine (Beta)

We’ve been running in Google Compute Engine (GCE) and it appears to be working fine.   So, if you’re part of the beta, give it a shot.   Enjoy using Boundary in GCE to monitor your applications and troubleshoot problems – let us know if you see something interesting!

That’s all for this week, folks.  Have a great weekend -


* LargeMarge is a reference to this scene in….   which movie?  ;)

Page 1 of 7 Older Posts