Using Kafka with AdonisJs application
For the micro-services based applications, you can use Kafka to allow themicro-services to communicate using the publisher-subscriber pattern.
Kafka allows us to create a set of producers and consumers to send and receive messages.
In this blog, you will get the basic set up of Kafka producer and consumer for an adonis application.
Firstly let’s start with the Kafka set up.
For setting up Kafka on your machine you need to perform the following operations.
Kafka set up on MAC
To install Kafka using brew :
1. Install Kafka
$ brew cask install java
$ brew install kafka
2. Start the Zookeeper
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
3. Start the Kafka Server
$ kafka-server-start /usr/local/etc/kafka/server.properties
if you face a connection issue, please update the server.properties settings with the following changes
Replace
listeners=PLAINTEXT://:9092
to
listeners=PLAINTEXT://localhost:9092
4. Create a Kafka topic
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1
--topic sampleTopic
Kafka set up on ubuntu
To install Kafka on ubuntu, you would need a system with at least 4GB RAM.
1. Kafka require Java, so to install OpenJDK execute
$ sudo apt update
$ sudo apt install default-jdk
2. Download required Kafka package, here we are using Kafka 2.3.0
wget http://www-us.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz
3. Extract and move the Kafka file to a specified location
$ tar xzf kafka_2.11-2.3.0.tgz
$ mv kafka_2.11-2.3.0 /usr/local/kafka
4. Start the zookeeper and kafka server
$ cd /usr/local/kafka
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
5. Create a kafka topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1
--topic sampleTopic
Configuring Adonis.js Application to use Kafka
Now since the set up for Kafka is ready, we will configure our adonis application to use Kafka
Use the Kafka-node package for using Kafka with the adonis.js application
npm i kafka-node
Since the Kafka consumer need to be initialized at the start of an application to continuously listen for the messages produced by consumers, we will be creating a command in adonis to start the consumer for listening.
Create the Command for starting the consumers with the following
Learn how tocreatecommands in adonis.js(Creating Custom Commands with Adonis.js)
adonis make:command startConsumer
Update the handle method of the StartConsumer.js with following
async handle (args, options)
{ Notification.consume_events()}
Here, consume_events() is a static method that belongs to the Notification class defined as follows
const Model = use('Model')
const Env = use('Env')
const kafka = require('kafka-node')
class Notification extends Model {
static consume_email_events (){
const Consumer = kafka.Consumer
const client = new kafka.KafkaClient(Env.get('KAFKA_SERVER'))
const consumer = new Consumer(client,[{ topic: 'sampleTopic'}],{autoCommit: false})
consumer.on('message', function (msg) {
console.log("inside consumer")
console.log(msg)
})
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
}
}
module.exports = Notification
For a producer to publish the message, we need to add the following method at the required trigger, here we are using an afterCreate hook of user model to publish a message
UserHook.triggerNotifications = async (userInstance) => {
Notification.produce_event('sampleTopic','New user created')}
Here produce_event() is another static method added to the Notification class as follows –
static produce_event (topic,messages){
const Producer = kafka.Producer
const client = new kafka.KafkaClient(Env.get('KAFKA_SERVER'))
const producer = new Producer(client)
let payloads = [{
topic: topic,
messages: messages
}]
producer.on('ready', function () {
let push_status = producer.send(payloads, (err, data) => {
if (err) {
console.log('[kafka-producer -> '+topic+']: broker failed to update')
}
else {
console.log('[kafka-producer -> '+topic+']: broker updated successfully')
}
})
})
producer.on('error', function (err) {
console.log('Producer is in error state')
console.log(err)
})
}
To execute the command with the startup of an adonis application, configure the server.js file with preloading the startConsumer command. The Start consumer command is set inside the kafka.js defined in the start folder of the application as follows
const Consumer = use('Consumer')
Consumer.exec()
The consumer is defined inside the app.js as an alias
const aliases = {
Consumer: 'App/Commands/StartConsumer'}
and, the startConsumer command is registered as
const commands = ['App/Commands/StartConsumer']
Lastly, preload the Kafka with start of adonis application
new Ignitor(require('@adonisjs/fold'))
.appRoot(__dirname)
.preLoad('start/kafka')
.fireHttpServer()
.catch(console.error)
As and when a user is created, the consumer would log the message
New user created
Happy Coding 🙂
In case of queries, please feel free to drop a comment.