Banzai Cloud is now part of Cisco

Banzai Cloud Logo Close
Home Products Benefits Blog Company Contact

The content of this page hasn't been updated for years and might refer to discontinued products and projects.

This post is part of the Debug 101 series. If you missed the previous post in this series, check it out here:

We’re in the middle of deploying Apache Kafka to Kubernetes the cloud native-way - by totally removing the Zookeeper dependency and using etcd, instead. This means that service registry/discovery and other internal Kafka to Zookeeper operations will be dispatched to a pre-existing etcd cluster. Sweet, isn’t it? No need for yet another third party system, because already have etcd as part of Kubernetes, out-of-the-box.

In this post we don’t want to go into detail about why we choose to totally remove Zookeeper, or why it’s considerably better to rely on etcd when deploying to Kubernetes. Once we’ve completed this project and pushed the PR upstream, we’ll revisit that topic. Meanwhile, if you’d like consider one single point on this matter - performance - we reccomend you read this blog.

To recap, etcd is a distributed key/value store which relies on the Raft consensus algorithm and is used internally by Kubernetes. The java library which can interact with it is called jetcd - currently under development and in beta. We are using this library to remove the Zookeeper dependency, and it’s worked pretty well so far. Earlier this week we arrived at a point in making these changes wherein we were running a large amount of Kafka tests, and few of them were failing. The problem we encontered was related to Transactions. A simple jetcd transaction looks like this:

val client: Client = Client.builder().endpoints("http://localhost:2379").build()

val test = Try {
    client.getKVClient.txn().
        If(new Cmp(ByteSequence.fromString("foo"), Cmp.Op.GREATER, CmpTarget.version(0))).
        Then(Op.get(ByteSequence.fromString("foo"), GetOption.DEFAULT)).
        Else(Op.put(ByteSequence.fromString("foo"), ByteSequence.fromString("bar"),PutOption.DEFAULT)).commit().get()
}

It checkes if the key foo exists, if it does, it gets the value or else it creates foo with the value bar.

  if(test.get.isSucceeded)
    println(s"Key `foo` exists and the value is: ${test.get.getGetResponses.get(0).getKvs.get(0).getValue}")
  else
    println("Key `foo` does not exists, creating new one with value `bar`")
  client.close()

Before we can check this code we need a running etcd cluster. To do that, run the following docker command:

docker run   --rm -d -p 2379:2379   -p 2380:2380   --name etcd quay.io/coreos/etcd:v3.2.9   /usr/local/bin/etcd   --data-dir=/etcd-data --name node1   --initial-advertise-peer-urls http://127.0.0.1:2380 --listen-peer-urls http://0.0.0.0:2380   --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://0.0.0.0:2379   --initial-cluster node1=http://127.0.0.1:2380

Back to our scala code. As anybody might rightfully assume, our first run will produce the following output: Key foo does not exists, creating new one with value bar. But what happens if we try to run the code again? Unfortunatelly the results are less predictable:

Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
	at java.util.ArrayList.get(ArrayList.java:433)
	at Main$.delayedEndpoint$Main$1(Main.scala:21)
	at Main$delayedInit$body.apply(Main.scala:9)
	at scala.Function0.apply$mcV$sp(Function0.scala:34)
	at scala.Function0.apply$mcV$sp$(Function0.scala:34)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App.$anonfun$main$1$adapted(App.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:389)
	at scala.App.main(App.scala:76)
	at scala.App.main$(App.scala:74)
	at Main$.main(Main.scala:9)
	at Main.main(Main.scala)

Let’s take a closer look: test.get.getGetResponses.get(0).getKvs.get(0).getValue. The test variable holds a Try[TxnResponse]. The TxnResponse can contain three different responses:

  • PutResponse
  • GetResponse
  • DeleteResponse

In our case we have one GetResponse, because inside the transaction (Then(Op.get(ByteSequence.fromString("foo"), GetOption.DEFAULT))) we issued one get operation. But, of course, put, get and delete operations can be mixed into one transaction. So to get back every GetResponse you have to call getGetResponses. Now take a closer look at the jetcd API implementation, especially the TxnResponse.java file:

 /**
   * returns a list of GetResponse; empty list if none.
   */
  public synchronized List<GetResponse> getGetResponses() {
    if (getResponses == null) {
      getResponses = getResponse().getResponsesList().stream()
          .filter((responseOp) -> responseOp.getResponseCase() != RESPONSE_RANGE)
          .map(responseOp -> new GetResponse(responseOp.getResponseRange()))
          .collect(Collectors.toList());
    }

    return getResponses;
  }

In the exception above, we clearly received an empty list. How did this happen? It turns out that this method’s implementation has errors, because it cannot return anything but an empty list. Moreover, getPutResponses and getDeleteResponses are also affected by this bug. The bug is inside the filter call, RESPONSE_RANGE is an enum which is used to identify the various responses. It transpires that the GetResponse corresponds to RESPONSE_RANGE, so, instead of !=, == is needed.

Luckily, this bug was fixed on the master branch by this PR. We assume there will be a new release, soon. However, until then, if Txn is essential for your work, the master branch needs to be built locally with mvn clean install, and a local maven repo needs to be referred on your project.