All Articles

Persist Kafka Messages to MinIO

Kafka is the leading massive parallel messaging queue system. It supports high throughput and low latency which is ideal for capturing event data from variety of applications and passing these message queues to other applications. While kafka can move high data volumes, lot of use cases need to store the data in Kafka queues for processing later.

MinIO is S3 compatible object storage software, that is designed for high performance workloads, best suited for large scale, object storage use cases. Kafka and MinIO together can be used for ingress / managing and finally storing huge data volumes.

Get Started

In this post we’ll take a look at how to use Kafka and MinIO to ingest huge data volumes and store it in persistent manner to ensure data is available for later analysis and consumption. We’ll see the steps taken for deploying MinIO and Kafka on Linux platforms.

Install MinIO

We’ll deploy MinIO as systemd service. First, create user and group to run MinIO server

sudo useradd minio-user -m
sudo passwd minio-user

Steps to deploy MinIO are explained here: https://github.com/minio/minio-service/tree/master/linux-systemd/distributed. Do remember to make the below changes in this script.

  • Update MINIO_VOLUMES with actual paths to volume mounts.
  • Set MINIO_OPTS=“—address :9000”
  • Update MINIOACCESSKEY and MINIOSECRETKEY with actual credentials to be used for MinIO server.

Install minio-client

Next, lets install mc (minio client) to interact with your MinIO deployment.

su -l minio-user
wget https://dl.minio.io/client/mc/release/linux-amd64/mc
chmod +x mc

Install Kafka

We’ll use open source Apache Kafka. Follow this link till step-4 to install Kafka : https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-centos-7

Integrating Kafka with Minio

Let use Confluent S3 plugin to journal Kafka messages to MinIO buckets. Note that the the plugin version used is 5.2.0. Download and save plugin

su -l kafka
mkdir confluent-plugins
cd confluent-plugins
wget https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/5.2.0/archive
unzip archive
cd ..
mkdir plugins
mkdir plugins/kafka-connect-s3
mv confluent-plugins/confluentinc-kafka-connect-s3-5.2.0/lib/* plugins/kafka-connect-s3/

Then, create configuration files at ~/plugins/connect.properties in Confluent connector directory.

cd ~/plugins
touch connector.properties

Then add the below contents in connect.properties file

# Kafka broker IP addresses to connect to
bootstrap.servers=localhost:9092
# Path to directory containing the connector jar and dependencies
plugin.path=/home/kafka/plugins
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets

Next, create s3-sink.properties file at the same location with below contents

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=minio_topic
s3.region=us-east-1
s3.bucket.name=kafka-bucket
s3.part.size=5242880
flush.size=3
store.url=http://127.0.0.1:9000
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE

Create Kafka Topic

Create Kafka Topic called minio_topic. Note this is the topic we set above in s3-sink.properties file.

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic minio_topic

Create MinIO bucket

Create MinIO bucket where data from Kafka will be stored. Note this is the bucket we referred above in s3-sink.properties file. We’ll use mc for this.

su -l minio-user
./mc config host add myminio http://localhost:9000 minio minio123
./mc mb myminio/kafka-bucket

Configure MinIO credentials

su -l kafka
mkdir .aws
vi credentials

Add the below contents to credentials file

[default]
aws_access_key_id = minio
aws_secret_access_key = minio123

Start Kafka connector

su -l kafka
cd kafka
./bin/connect-standalone.sh ../plugins/connect.properties ../plugins/s3-sink.properties

Publish data to Kafka topic

Publish data to Kafka topic minio_topic

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic minio_topic > /dev/null
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic minio_topic > /dev/null
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic minio_topic > /dev/null

Since flush size is set to 3, plugin would flush the data to MinIO once there are three messages in the topic minio_topic. You should see the data on MinIO now

su -l minio-user
./mc ls -r myminio