Tag Archives: distributed systems

Design patterns in orchestrators: transfer of desired state (part 3/N)

Most datacenter automation tools operate on the basis of desired state. Desired state describes what should be the end state but not how to get there. To simplify a great deal, if the thing being automated is the speed of a car, the desired state may be “60mph”. How to get there (braking, accelerator, gear changes, turbo) isn’t specified. Something (an “agent”) promises to maintain that desired speed.

desiredstate

The desired state and changes to the desired state are sent from the orchestrator to various agents in a datacenter. For example, the desired state may be “two apache containers running on host X”. An agent on host X will ensure that the two containers are running. If one or more containers die, then the agent on host X will start enough containers to bring the count up to two. When the orchestrator changes the desired state to “3 apache containers running on host X”, then the agent on host X will create another container to match the desired state.

Transfer of desired state is another way to achieve idempotence (a problem described here)

We can see that there are two sources of changes that the agent has to react to:

  1. changes to desired state sent from the orchestrator and
  2. drift in the actual state due to independent / random events.

Let’s examine #1 in greater detail. There’s a few ways to communicate the change in desired state:

  1. Send the new desired state to the agent (a “command” pattern). This approach works most of the time, except when the size of the state is very large. For instance, consider an agent responsible for storing a million objects. Deleting a single object would involve sending the whole desired state (999999 items). Another problem is that the command may not reach the agent (“the network is not reliable”). Finally, the agent may not be able to keep up with rate of change of desired state and start to drop some commands.  To fix this issue, the system designer might be tempted to run more instances of the agent; however, this usually leads to race conditions and out-of-order execution problems.
  2. Send just the delta from the previous desired state. This is fraught with problems. This assumes that the controller knows for sure that the previous desired state was successfully communicated to the agent, and that the agent has successfully implemented the previous desired state. For example, if the first desired state was “2 running apache containers” and the delta that was sent was “+1 apache container”, then the final actual state may or may not be “3 running apache containers”. Again, network reliability is a problem here. The rate of change is an even bigger potential problem here: if the agent is unable to keep up with the rate of change, it may drop intermediate delta requests. The final actual state of the system may be quite different from the desired state, but the agent may not realize it! Idempotence in the delta commands helps in this case.
  3. Send just an indication of change (“interrupt”). The agent has to perform the additional step of fetching the desired state from the controller. The agent can compute the delta and change the actual state to match the delta. This has the advantage that the agent is able to combine the effects of multiple changes (“interrupt debounce”). By coalescing the interrupts, the agent is able to limit the rate of change. Of course the network could cause some of these interrupts to get “lost” as well. Lost interrupts can cause the actual state to diverge from the desired state for long periods of time. Finally, if the desired state is very large, the agent and the orchestrator have to coordinate to efficiently determine the change to the desired state.
  4. The agent could poll the controller for the desired state. There is no problem of lost interrupts; the next polling cycle will always fetch the latest desired state. The polling rate is critical here: if it is too fast, it risks overwhelming the orchestrator even when there are no changes to the desired state; if too slow, it will not converge the the actual state to the desired state quickly enough.

To summarize the potential issues:

  1. The network is not reliable. Commands or interrupts can be lost or agents can restart / disconnect: there has to be some way for the agent to recover the desired state
  2. The desired state can be prohibitively large. There needs to be some way to efficiently but accurately communicate the delta to the agent.
  3. The rate of change of the desired state can strain the orchestrator, the network and the agent. To preserve the stability of the system, the agent and orchestrator need to coordinate to limit the rate of change, the polling rate and to execute the changes in the proper linear order.
  4. Only the latest desired state matters. There has to be some way for the agent to discard all the intermediate (“stale”) commands and interrupts that it has not been able to process.
  5. Delta computation (the difference between two consecutive sets of desired state) can sometimes be more efficiently performed at the orchestrator, in which case the agent is sent the delta. Loss of the delta message or reordering of execution can lead to irrecoverable problems.

A persistent message queue can solve some of these problems. The orchestrator sends its commands or interrupts to the queue and the agent reads from the queue. The message queue buffers commands or interrupts while the agent is busy processing a desired state request.  The agent and the orchestrator are nicely decoupled: they don’t need to discover each other’s location (IP/FQDN). Message framing and transport are taken care of (no more choosing between Thrift or text or HTTP or gRPC etc).

messageq

There are tradeoffs however:

  1. With the command pattern, if the desired state is large, then the message queue could reach its storage limits quickly. If the agent ends up discarding most commands, this can be quite inefficient.
  2. With the interrupt pattern, a message queue is not adding much value since the agent will talk directly to the orchestrator anyway.
  3. It is not trivial to operate / manage / monitor a persistent queue. Messages may need to be aggressively expired / purged, and the promise of persistence may not actually be realized. Depending on the scale of the automation, this overhead may not be worth the effort.
  4. With an “at most once” message queue, it could still lose messages. With  “at least once” semantics, the message queue could deliver multiple copies of the same message: the agent has to be able to determine if it is a duplicate. The orchestrator and agent still have to solve some of the end-to-end reliability problems.
  5. Delta computation is not solved by the message queue.

OpenStack (using RabbitMQ) and CloudFoundry (using NATS) have adopted message queues to communicate desired state from the orchestrator to the agent.  Apache CloudStack doesn’t have any explicit message queues, although if one digs deeply, there are command-based message queues simulated in the database and in memory.

Others solve the problem with a combination of interrupts and polling – interrupt to execute the change quickly, poll to recover from lost interrupts.

Kubernetes is one such framework. There are no message queues, and it uses an explicit interrupt-driven mechanism to communicate desired state from the orchestrator (the “API Server”) to its agents (called “controllers”).

Courtesy of Heptio

(Image courtesy: https://blog.heptio.com/core-kubernetes-jazz-improv-over-orchestration-a7903ea92ca)

Developers can use (but are not forced to use) a controller framework to write new controllers. An instance of a controller embeds an “Informer” whose responsibility is to watch for changes in the desired state and execute a controller function when there is a change. The Informer takes care of caching the desired state locally and computing the delta state when there are changes. The Informer leverages the “watch” mechanism in the Kubernetes API Server (an interrupt-like system that delivers a network notification when there is a change to a stored key or value). The deltas to the desired state are queued internally in the Informer’s memory. The Informer ensures the changes are executed in the correct order.

  • Desired states are versioned, so it is easier to decide to compute a delta, or to discard an interrupt.
  • The Informer can be configured to do a periodic full resync from the orchestrator (“API Server”) – this should take care of the problem of lost interrupts.
  • Apparently, there is no problem of the desired state being too large, so Kubernetes does not explicitly handle this issue.
  • It is not clear if the Informer attempts to rate-limit itself when there are excessive watches being triggered.
  • It is also not clear if at some point the Informer “fast-forwards” through its queue of changes.
  • The watches in the API Server use Etcd watches in turn. The watch server in the API server only maintains a limited set of watches received from Etcd and discards the oldest ones.
  • Etcd itself is a distributed data store that is more complex to operate than say, an SQL database. It appears that the API server hides the Etcd server from the rest of the system, and therefore Etcd could be replaced with some other store.

I wrote a Network Policy Controller for Kubernetes using this framework and it was the easiest integration I’ve written.

It is clear that the Kubernetes creators put some thought into the architecture, based on their experiences at Google. The Kubernetes design should inspire other orchestrator-writers, or perhaps, should be re-used for other datacenter automation purposes. A few issues to consider:

  • The agents (“controllers”) need direct network reachability to the API Server. This may not be possible in all scenarios, needing another level of indirection
  • The API server is not strictly an orchestrator, it is better described as a choreographer. I hope to describe this difference in a later blog post, but note that the API server never explicitly carries out a step-by-step flow of operations.
Advertisements

How to manage a million firewalls – part 2

Continuing from my last post where I hinted about the big distributed systems problem involved in managing a CloudStack Basic Zone.

It helps to understand how CloudStack is architected at a high level. CloudStack is typically operated as a cluster of identical Java applications (called the “Management Server” or “MS”). There is a MySQL database that holds the desired state of the cloud. API calls arrive at a management server (through a load balancer). The management server uses the current state as stored in the MySQL database, computes/stores a new state and communicates any changes to the cloud infrastructure.

sg_groups_pptx8

In response to an API call, the management server(s) usually have to communicate with one or more hypervisors. For example, adding a rule to a security group (a single API call)  could involve communicating changes to dozens or hundreds of hypervisors. The job of communicating with the hypervisors is split (“sharded”) among the cluster members. For example if there’s 3000 hypervisors and 3 management servers, then each MS handles communications with 1000 hypervisors. If the API call arrives at MS ‘A’ and needs to update a hypervisor managed by MS ‘B’, then the communication is brokered through B.

Now updating a thousand firewalls  (remember, the firewalls are local to the hypervisor) in response to a single API call requires us to think about the API call semantics. Waiting for all 1000 firewalls to respond could take a very long time. The better approach is to return success to the API and work in the background to update the 1000 firewalls. It is also likely that the update is going to fail on a small percentage of the firewalls. The update could fail due to any number of problems: (transient) network problems between the MS and the hypervisor, a problem with the hypervisor hardware, etc.

This problem can be described in terms of the CAP theorem as well. A piece of state (the state of the security group) is being stored on a number of distributed machines (the hypervisors in this case). When there is a network partition (P), do we want the update to the state to be Consistent (every copy of the state is the same), or do we want the API to be Available (partition-tolerant).  Choosing Availability ensures that the API call never fails, regardless of the state of the infrastructure. But it also means that the state is potentially inconsistent across the infrastructure when there is a partition.

A lot of the problems with an inconsistent state can be hand-waved away1 since the default behavior of the firewall is to drop traffic. So if the firewall doesn’t get the new rule or the new IP address, it means that inconsistency is safe: we are not letting in traffic that we didn’t want to.

A common strategy in AP systems is to be eventually consistent. That is, at some undefined point in the future, every node in the distributed system will agree on the state. So, for example, the API call needs to update a hundred hypervisors, but only 95 of them are available. At some point in the future, the remaining 5 do become available and are updated to the correct state.

When a previously disconnected hypervisor reconnects to the MS cluster, it is easy to bring it up to date, since the authoritative state is stored in the MySQL database associated with the CloudStack MS cluster.

A different distributed systems problem is to deal with concurrent writes. Let’s say you send a hundred API calls in quick succession to the MS cluster to start a hundred VMs. Each VM creation leads to changes in many different VM firewalls. Not every API call lands on the same MS: the load balancer in front of the cluster will distribute it to all the machines in the cluster. Visualizing the timeline:

sg_groups_pptx9

A design goal is to push the updates to the VM firewalls as soon as possible (this is to minimize the window of inconsistency). So, as the API calls arrive, the MySQL database is updated and the new firewall states are computed and pushed to the hypervisors.

While MySQL concurrency primitives allow us to safely modify the database (effectively serializing the updates to the security groups), the order of updates to the database may not be the order of updates that flow to the hypervisor. For example, in the table above, the firewall state computed as a result of the API call at T=0 might arrive at the firewall for VM A after the firewall state computed at T=2. We cannot accept the “older” update.sg_groups_pptx10

The obvious2 solution is to insert the order of computation in the message (update) sent to the firewall. Every time an API call results in a change to the state of a VM firewall, we update a persistent sequence number associated with that VM. That sequence number is transmitted to the firewall along with the new state. If the firewall notices that the latest update received is “older” than the one it is has already processed, it just ignores it. In the figure above, the “red” update gets ignored.

An crucial point is that every update to the firewall has to contain the complete state: it cannot just be the delta from the previous state3.

The sequence number has to be stored on the hypervisor so that it can compare the received sequence number. The sequence number also optimizes the updates to hypervisors that reconnect after a network partition has healed: if the sequence number matches, then no updates are necessary.

Well, I’ve tried to keep this part under a thousand words. The architecture discussed here did not converge easily — there was a lot of mistakes and learning along the way. There is no way for other cloud / orchestration systems to re-use this code, however, I hope the reader will learn from my experience!


1. The only case to worry about is when rules are deleted: an inconsistent state potentially means we are allowing traffic when we didn’t intend to. In practice, rule deletes are a very small portion of the changes to security groups. Besides if the rule exists because it was intentionally created — it probably is OK to take a little time to delete it
2. Other (not-so-good) solutions involve locks per VM, and queues per VM
3. This is a common pattern in orchestrating distributed infrastructure

How to manage a million firewalls – part 1

In my last post I argued that security groups eliminate the need for network security devices in certain parts of the datacenter. The trick that enables this is the network firewall in the hypervisor. Each hypervisor hosts dozens or hundreds of VMs — and provides a firewall per VM. The figure below shows a typical setup, with Xen as the hypervisor. Ingress network traffic flows through the hardware into the control domain (“dom0”) where it is switched in software (so called virtual switch or vswitch) to the appropriate VM.

sg_groups_pptx7

The vswitch provides filtering functions that can block or allow certain types of traffic into the VM. Traffic between the VMs on the same hypervisor goes through the vswitch as well. The vswitch used in this design is the Linux Bridge; the firewall function is provided by netfilter ( “iptables”).

Security groups drop all traffic by default and only allow those configured by the rules. Suppose the red VMs in the figure (“Guest 1” and “Guest 4”) are in a security group “management”. We want to allow access to them from the subnet 192.168.1.0/24 on port 22 (ssh). The iptables rules might look like this:

iptables -A FORWARD -p tcp --dport 22 --src 192.168.1.0/24 -j ACCEPT 
iptables -A FORWARD -j DROP

Line 1 reads: for packets forwarded across the bridge (vswitch) that are destined for port 22, and are from source 192.168.1.0/24, allow (ACCEPT) them. Line 2 reads: DROP everything. The rules form a chain: packets traverse the chain until they match. (this is highly simplified: we want to match on the particular bridge ports that are connected to the VMs in question as well).

Now, let’s say we want to allow members of the ‘management’ group access their members over ssh as well. Let’s say there are 2 VMs in the group, with IPs of ‘A’ and ‘B’.  We calculate the membership and for each VM’s firewall, we write additional rules:

#for VM A
iptables -I FORWARD -p tcp --dport 22 --source B -j ACCEPT
#for VM B
iptables -I FORWARD -p tcp --dport 22 --source A -j ACCEPT

As we add more VMs to this security group, we have to add more such rules to each VM’s firewall. (A VM’s firewall is the chain of iptables rules that are specific to the VM).  If there are ‘N’ VMs in the security group, then each VM has N-1 iptables rules for just this one security group rule. Remember that a packet has to traverse the iptables rules until it matches or gets dropped at the end. Naturally each rule adds latency to a packet (at least to the connection-initiating ones).  After a certain number (few hundreds) of rules, the latency tends to go up hockey-stick fashion. In a large cloud, each VM could be in several security groups and each security group could have rules that interact with other security groups — easily leading to several hundred rules.

Aha, you might say, why not just summarize the N-1 source IPs and write a single rule like:

iptables -I FORWARD -p tcp --dport 22 --source <summary cidr> -j ACCEPT

Unfortunately, this isn’t possible since it is never guaranteed that the N-1 IPs will be in a single CIDR block. Fortunately this is a solved problem: we can use ipsets. We can add the N-1 IPs to a single named set (“ipset”). Then:

ipset -A mgmt <IP1>
ipset -A mgmt <IP2>
...
iptables -I FORWARD -p tcp --dport 22 -m set match-set mgmt src -j ACCEPT

IPSets matching is usually very fast and fixes the ‘scale up’ problem. In practice, I’ve seen it handle tens of thousands of IPs without significantly affecting latency or CPU load.

The second (perhaps more challenging) problem is that when the membership of a group changes, or a rule is added / deleted, a large number of VM firewalls have to be updated. Since we want to build a very large cloud, this usually means thousands or tens of thousands of hypervisors have to be updated with these changes. Let’s say in the single group/single rule example above, there are 500 VMs in the security groups. Adding a VM to the group means that 501 VM firewalls have to be updated. Adding a rule to the security group means that 500 VM firewalls have to be updated. In the worst case, the VMs are on 500 different hosts — making this a very big distributed systems problem.

If we consider a typical datacenter of 40,000 hypervisor hosts, with each hypervisor hosting an average of 25 VMs, this becomes the million firewall problem.

Part 2 will examine how this is solved in CloudStack’s Basic Zone.