Peter Mbanugo
Peter's Blog

Peter's Blog

How to Automate Kafka Topic Management

Peter Mbanugo's photo
Peter Mbanugo
·Jul 10, 2022·

5 min read

How to Automate Kafka Topic Management

Photo by Roman Synkevych 🇺🇦 on Unsplash

Subscribe to my newsletter and never miss my upcoming articles

Play this article

Kafka topics are the categories used to organize events. You create different topics to hold different kinds of events, and different topics to hold filtered and transformed versions of the same kind of event. As a developer, the topic is among the first set of things you think about when designing how events flow in your system. You create topics and then read or write events to topics.

In a DevOps-driven team where infrastructure is managed and automated via code, how do you automate the creation or deletion of Kafka topics?

In this post, I'm going to show you how to do that using JavaScript. There are different ways to achieve the same thing, for different infrastructure set-ups. For this post, let's assume we're running a Kafka cluster that's only accessible within a VPC, and that we also have a Kubernetes cluster. The solution will be a JavaScript app that runs as a Kubernetes Job, whenever a topic needs to be created or deleted.

Why JavaScript? Well, it's an easy way to write a script without the complexities of Bash. If you have JavaScript developers, this enables other developers to contribute as well. If you're a Python shop, the solution can be applied using Python as well.

Set Up The Application

The solution is a Node.js application and for that, you will need a Node.js project. You can create a new project using the npm init command. If you don't have Node.js and npm, you should download and install the required binaries from nodejs.org/en/download.

Open your terminal to the directory you want to create the app, then run the command npm init -y. Install the Kafka JavaScript client as a dependency using the command npm install kafkajs.

Implementing The Solution

The application will read a list of topics to create/delete through a JSON file. What I want to achieve here is a workflow where anyone can make changes to a JSON file in a GitHub repo, and open a PR with their change. Once the PR is merged into the main branch, the code reads the data from that file and then creates or deletes a list of topics as desired.

To achieve this you should create a JSON file named topics.json with the following content:

{
  "create": [],
  "delete": []
}

That structure allows you to have an array of strings containing the names of topics to create or delete. Also, looking at that file in the source control system gives me an idea of the topics created in Kafka.

Next, create a file api.js. Copy the following code snippet and paste it into api.js.

async function createTopics(topics, kafkaAdmin) {
  if (topics.length > 0) {
    await kafkaAdmin.createTopics({
      topics: topics.map((topic) => ({
        topic,
        numPartitions: 1,
        replicationFactor: 3,
        configEntries: [{ name: "min.insync.replicas", value: "2" }],
      })),
    });
  }
}

async function deleteTopics(topics, kafkaAdmin) {
  if (topics.length > 0) {
    await kafkaAdmin.deleteTopics({ topics: topics });
  }
}

module.exports = { createTopics, deleteTopics };

This module exports functions to create and delete Kafka topics. The createTopics function takes an array of topics and the Kafka admin client instance as arguments. Then it calls kafkaAdmin.createTopics to create the topics. The number of partitions and config entries specified is just an example. You should configure them to match your setup.

Create a new file index.js and paste the following code into it.

const { Kafka } = require("kafkajs");
const { createTopics, deleteTopics } = require("./api");
const topics = require("../topics.json");

const username = process.env.KAFKA_USERNAME;
const password = process.env.KAFKA_PASSWORD;
const brokers = process.env.KAFKA_URL ? process.env.KAFKA_URL.split(",") : [];

if (!username && !password && brokers.length === 0) {
  throw new Error("Missing Kafka Client Credential");
}

const kafka = new Kafka({
  clientId: "admin-script",
  brokers: brokers,
  ssl: {
    rejectUnauthorized: false,
  },
  sasl: {
    mechanism: "scram-sha-512",
    username,
    password,
  },
});

const admin = kafka.admin();

admin.connect().then(async () => {
  const existingTopics = await admin.listTopics();

  const newTopics = topics.create.filter((x) => !existingTopics.includes(x));
  await createTopics(newTopics, admin);

  const deletionTopics = topics.delete.filter((x) =>
    existingTopics.includes(x)
  );
  await deleteTopics(deletionTopics, admin);

  await admin.disconnect();
});

The code above creates a Kafka client and connects to the Kafka admin API. After the connection is established, it calls the functions createTopics and deleteTopics respectively and then exits.

Automation Using GitHub Actions

Let's assume your code lives in a GitHub repository, whenever the topics.json file is modified, you want to run a Kubernetes Job that executes the Node.js app. We will do that using GitHub Actions.

Add the file kafka.yml to the directory .github/workflows.

name: Deploy Kafka Topics Job

on:
  push:
    branches: [main]

env:
  JOB_NAME: kafka-topics
  AWS_REGION: eu-west-1
  KUBERNETES_CLUSTER: demo-cluster
  KUBERNETES_NAMESPACE: default

jobs:
  build-and-push:
    name: Build & Push to ECR
    runs-on: ubuntu-latest
    steps:
      - name: Git checkout
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-west-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Add short commit hash
        id: short-commit-hash
        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"

      - name: Build Docker container and push to ECR
        uses: dfreilich/pack-action@v2.1.1
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
        with:
          args: "build ${{ env.ECR_REGISTRY }}/${{ env.JOB_NAME}}:${{ env.IMAGE_TAG}} --builder heroku/buildpacks --buildpack heroku/nodejs --publish"

  deploy-job:
    name: Deploy to Kubernetes
    needs: [build-and-push]
    runs-on: ubuntu-latest
    steps:
      - name: Git checkout
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-west-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Add short commit hash
        id: short-commit-hash
        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"

      - name: Set Image Name
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
        run: 'echo "IMAGE_NAME=$(echo ${ECR_REGISTRY})/$(echo ${JOB_NAME}):$(echo ${IMAGE_TAG})" >> $GITHUB_ENV'

      - name: Create Job
        env:
          SHA: ${{ steps.short-commit-hash.outputs.sha_short }}
        run: |
          aws eks update-kubeconfig \
            --region ${AWS_REGION} \
            --name ${KUBERNETES_CLUSTER}

          cat <<EOF | kubectl apply -f -
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: ${JOB_NAME}-${SHA}
            namespace: ${KUBERNETES_NAMESPACE}
            labels:
              jobgroup: ${JOB_NAME}
          spec:
            ttlSecondsAfterFinished: 259200
            template:
              spec:
                containers:
                - name: ${JOB_NAME}-${SHA}
                  image: ${IMAGE_NAME}
                  envFrom:
                  - secretRef:
                      name: kafka-secrets
                restartPolicy: Never
            backoffLimit: 2
          EOF

The workflow above has two jobs. The build-and-push job builds a container image using the Pack CLI and Paketo Buildpacks, and pushes the built image to AWS container registry. The deploy-job creates a Kubernetes Job using the built image.

The workflow assumes the use of AWS and GitHub actions, but a similar thing can be replicated for other source control and CI/CD systems. The point here is to use JavaScript, a JSON file, and GitHub Actions, to automate creating or deleting Kafka topics when the JSON file changes. The concept can be adapted to satisfy your use case.

Did you find this article valuable?

Support Peter Mbanugo by becoming a sponsor. Any amount is appreciated!

Learn more about Hashnode Sponsors
 
Share this