Tag Archives: kubernetes

Design patterns in orchestrators: transfer of desired state (part 3/N)

Most datacenter automation tools operate on the basis of desired state. Desired state describes what should be the end state but not how to get there. To simplify a great deal, if the thing being automated is the speed of a car, the desired state may be “60mph”. How to get there (braking, accelerator, gear changes, turbo) isn’t specified. Something (an “agent”) promises to maintain that desired speed.

desiredstate

The desired state and changes to the desired state are sent from the orchestrator to various agents in a datacenter. For example, the desired state may be “two apache containers running on host X”. An agent on host X will ensure that the two containers are running. If one or more containers die, then the agent on host X will start enough containers to bring the count up to two. When the orchestrator changes the desired state to “3 apache containers running on host X”, then the agent on host X will create another container to match the desired state.

Transfer of desired state is another way to achieve idempotence (a problem described here)

We can see that there are two sources of changes that the agent has to react to:

  1. changes to desired state sent from the orchestrator and
  2. drift in the actual state due to independent / random events.

Let’s examine #1 in greater detail. There’s a few ways to communicate the change in desired state:

  1. Send the new desired state to the agent (a “command” pattern). This approach works most of the time, except when the size of the state is very large. For instance, consider an agent responsible for storing a million objects. Deleting a single object would involve sending the whole desired state (999999 items). Another problem is that the command may not reach the agent (“the network is not reliable”). Finally, the agent may not be able to keep up with rate of change of desired state and start to drop some commands.  To fix this issue, the system designer might be tempted to run more instances of the agent; however, this usually leads to race conditions and out-of-order execution problems.
  2. Send just the delta from the previous desired state. This is fraught with problems. This assumes that the controller knows for sure that the previous desired state was successfully communicated to the agent, and that the agent has successfully implemented the previous desired state. For example, if the first desired state was “2 running apache containers” and the delta that was sent was “+1 apache container”, then the final actual state may or may not be “3 running apache containers”. Again, network reliability is a problem here. The rate of change is an even bigger potential problem here: if the agent is unable to keep up with the rate of change, it may drop intermediate delta requests. The final actual state of the system may be quite different from the desired state, but the agent may not realize it! Idempotence in the delta commands helps in this case.
  3. Send just an indication of change (“interrupt”). The agent has to perform the additional step of fetching the desired state from the controller. The agent can compute the delta and change the actual state to match the delta. This has the advantage that the agent is able to combine the effects of multiple changes (“interrupt debounce”). By coalescing the interrupts, the agent is able to limit the rate of change. Of course the network could cause some of these interrupts to get “lost” as well. Lost interrupts can cause the actual state to diverge from the desired state for long periods of time. Finally, if the desired state is very large, the agent and the orchestrator have to coordinate to efficiently determine the change to the desired state.
  4. The agent could poll the controller for the desired state. There is no problem of lost interrupts; the next polling cycle will always fetch the latest desired state. The polling rate is critical here: if it is too fast, it risks overwhelming the orchestrator even when there are no changes to the desired state; if too slow, it will not converge the the actual state to the desired state quickly enough.

To summarize the potential issues:

  1. The network is not reliable. Commands or interrupts can be lost or agents can restart / disconnect: there has to be some way for the agent to recover the desired state
  2. The desired state can be prohibitively large. There needs to be some way to efficiently but accurately communicate the delta to the agent.
  3. The rate of change of the desired state can strain the orchestrator, the network and the agent. To preserve the stability of the system, the agent and orchestrator need to coordinate to limit the rate of change, the polling rate and to execute the changes in the proper linear order.
  4. Only the latest desired state matters. There has to be some way for the agent to discard all the intermediate (“stale”) commands and interrupts that it has not been able to process.
  5. Delta computation (the difference between two consecutive sets of desired state) can sometimes be more efficiently performed at the orchestrator, in which case the agent is sent the delta. Loss of the delta message or reordering of execution can lead to irrecoverable problems.

A persistent message queue can solve some of these problems. The orchestrator sends its commands or interrupts to the queue and the agent reads from the queue. The message queue buffers commands or interrupts while the agent is busy processing a desired state request.  The agent and the orchestrator are nicely decoupled: they don’t need to discover each other’s location (IP/FQDN). Message framing and transport are taken care of (no more choosing between Thrift or text or HTTP or gRPC etc).

messageq

There are tradeoffs however:

  1. With the command pattern, if the desired state is large, then the message queue could reach its storage limits quickly. If the agent ends up discarding most commands, this can be quite inefficient.
  2. With the interrupt pattern, a message queue is not adding much value since the agent will talk directly to the orchestrator anyway.
  3. It is not trivial to operate / manage / monitor a persistent queue. Messages may need to be aggressively expired / purged, and the promise of persistence may not actually be realized. Depending on the scale of the automation, this overhead may not be worth the effort.
  4. With an “at most once” message queue, it could still lose messages. With  “at least once” semantics, the message queue could deliver multiple copies of the same message: the agent has to be able to determine if it is a duplicate. The orchestrator and agent still have to solve some of the end-to-end reliability problems.
  5. Delta computation is not solved by the message queue.

OpenStack (using RabbitMQ) and CloudFoundry (using NATS) have adopted message queues to communicate desired state from the orchestrator to the agent.  Apache CloudStack doesn’t have any explicit message queues, although if one digs deeply, there are command-based message queues simulated in the database and in memory.

Others solve the problem with a combination of interrupts and polling – interrupt to execute the change quickly, poll to recover from lost interrupts.

Kubernetes is one such framework. There are no message queues, and it uses an explicit interrupt-driven mechanism to communicate desired state from the orchestrator (the “API Server”) to its agents (called “controllers”).

Courtesy of Heptio

(Image courtesy: https://blog.heptio.com/core-kubernetes-jazz-improv-over-orchestration-a7903ea92ca)

Developers can use (but are not forced to use) a controller framework to write new controllers. An instance of a controller embeds an “Informer” whose responsibility is to watch for changes in the desired state and execute a controller function when there is a change. The Informer takes care of caching the desired state locally and computing the delta state when there are changes. The Informer leverages the “watch” mechanism in the Kubernetes API Server (an interrupt-like system that delivers a network notification when there is a change to a stored key or value). The deltas to the desired state are queued internally in the Informer’s memory. The Informer ensures the changes are executed in the correct order.

  • Desired states are versioned, so it is easier to decide to compute a delta, or to discard an interrupt.
  • The Informer can be configured to do a periodic full resync from the orchestrator (“API Server”) – this should take care of the problem of lost interrupts.
  • Apparently, there is no problem of the desired state being too large, so Kubernetes does not explicitly handle this issue.
  • It is not clear if the Informer attempts to rate-limit itself when there are excessive watches being triggered.
  • It is also not clear if at some point the Informer “fast-forwards” through its queue of changes.
  • The watches in the API Server use Etcd watches in turn. The watch server in the API server only maintains a limited set of watches received from Etcd and discards the oldest ones.
  • Etcd itself is a distributed data store that is more complex to operate than say, an SQL database. It appears that the API server hides the Etcd server from the rest of the system, and therefore Etcd could be replaced with some other store.

I wrote a Network Policy Controller for Kubernetes using this framework and it was the easiest integration I’ve written.

It is clear that the Kubernetes creators put some thought into the architecture, based on their experiences at Google. The Kubernetes design should inspire other orchestrator-writers, or perhaps, should be re-used for other datacenter automation purposes. A few issues to consider:

  • The agents (“controllers”) need direct network reachability to the API Server. This may not be possible in all scenarios, needing another level of indirection
  • The API server is not strictly an orchestrator, it is better described as a choreographer. I hope to describe this difference in a later blog post, but note that the API server never explicitly carries out a step-by-step flow of operations.

Automated configuration of NetScaler Loadbalancer for Kubernetes, Mesos and Docker Swarm

There are an incredible number of Cluster Managers for containerized workloads. The top clustered container managers include Google’s Kubernetes, the Marathon framework on Apache Mesos and Docker Swarm. While these managers offer powerful scheduling and autonomic capabilities, integration with external load balancers is often left as an exercise for the user. Since load balancers are essential components in a horizontally-scaled microservice this omission can impede the roll-out of your chosen container manager. Further, since a microservice architecture demands rapid deployment, any solution has to be able to keep up with the changes in the topology and structure of the microservice.

Citrix NetScaler is an application delivery controller widely used in load balancing applications at several Web-scale companies. This blog post describes Nitrox, a containerized application that can work with Docker Swarm or Kubernetes or Marathon (on Apache Mesos) to automatically reconfigure a Citrix NetScaler instance in response to application events such as deployment, rolling upgrades and auto/manual scale.

The figure below shows a scaling event that causes the number of backend containers for application α to grow from 4 to 6. The endpoints of the additional containers have to be provisioned into the NetScaler as a result.

scaleout

All cluster managers offer an event stream of application lifecycle events. In this case, Docker Swarm sends a container start event; Marathon sends a status_update_event with a taskStatus field and Kubernetes sends an endpoint update event. The job of Nitrox is to listen to these event streams and react to the events. In most cases this fires a query back to the cluster manager to obtain the new list of endpoints. This list of endpoints is then configured into NetScaler.

nitrox

The reconfiguration of Netscaler is idempotent and complete: if the endpoint already exists in the Netscaler configuration, it isn’t re-done. This prevents unnecessary reconfiguration. The set of endpoints sent to Netscaler is not-incremental: the entire set is sent. This overcomes any problem with missed / dropped events and causes the Netscaler configuration to be eventually consistent.

Another choice made was to let the application / NetScaler admin provision the frontend details of the application. The front-end has myriad options such as lbmethod, persistence and stickiness. It is likely that each application has different needs for this configuration; also it is assumed to be chosen once and not dependent on size and scope of the backends.

You can find Nitrox  code and instructions here. The container is available on Docker Hub as chiradeeptest/nitrox. The containerized Nitrox can be scheduled like a regular workload on each of the container managers: docker run on Docker Swarm, an app on Marathon and a replication controller on Kubernetes.

Implementation Notes

Kubernetes ( and optionally: Docker Swarm) requires virtual networking (Kubernetes is usually  used with flannel). Therefore the container endpoints are endpoints on a virtual network. Since the NetScaler doesn’t participate in the virtual network (consider a non-virtualized NetScaler), this becomes a problem. For Docker Swarm, Nitrox assumes that bridged networking is used.

For Kubernetes, it is assumed that the service (app) being load balanced is configured to use NodePort style of exposing the service to external access. Kubernetes chooses a random port and exposes this port on every node in the cluster. Each node has a proxy that can provide access on this port. The proxy load balances the ingress traffic to each backend pod (container). One strategy then would be to simply configure the NetScaler to load balance to every node in the cluster. However, even if there are say 2 containers in the application but there are 50 nodes, then the Netscaler would needlessly send the traffic to many nodes. To make this more efficient, Nitrox figures out the list of nodes that the containers are actually running on and provisions these endpoints on the NetScaler.