For the micro-services based applications, you can use Kafka to allow the micro-services to communicate using the publisher-subscriber pattern.

Kafka allows us to create a set of producers and consumers to send and receive messages.

In this blog, you will get the basic set up of Kafka producer and consumer for an adonis application.

Firstly let’s start with the Kafka set up.

For setting up Kafka on your machine you need to perform the following operations.

Kafka set up on MAC

To install Kafka using brew :

1. Install Kafka

$ brew cask install java
$ brew install kafka

2. Start the Zookeeper

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

3. Start the Kafka Server

$ kafka-server-start /usr/local/etc/kafka/server.properties

if you face a connection issue, please update the server.properties settings with the following changes

Replace 

listeners=PLAINTEXT://:9092

to

listeners=PLAINTEXT://localhost:9092

4. Create a Kafka topic

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic 

Kafka set up on ubuntu

To install Kafka on ubuntu, you would need a system with at least 4GB RAM.

1. Kafka require Java, so to install OpenJDK execute

$ sudo apt update

$ sudo apt install default-jdk

2. Download required Kafka package, here we are using Kafka 2.3.0

wget http://www-us.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz

3. Extract and move the Kafka file to a specified location

$ tar xzf kafka_2.11-2.3.0.tgz
$ mv kafka_2.11-2.3.0 /usr/local/kafka

4. Start the zookeeper and kafka server

$ cd /usr/local/kafka
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

5. Create a kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic

Configuring Adonis.js Application to use Kafka

Now since the set up for Kafka is ready, we will configure our adonis application to use Kafka

Use the Kafka-node package for using Kafka with the adonis.js application

npm i kafka-node

Since the Kafka consumer need to be initialized at the start of an application to continuously listen for the messages produced by consumers, we will be creating a command in adonis to start the consumer for listening.

Create the Command for starting the consumers with the following

Learn how to create commands in adonis.js (Creating Custom Commands with Adonis.js)

adonis make:command startConsumer

Update the handle method of the StartConsumer.js with following

async handle (args, options) {
  Notification.consume_events()
}

Here, consume_events() is a static method that belongs to the Notification class defined as follows

const Model = use('Model')
const Env = use('Env')
const kafka = require('kafka-node')
class Notification extends Model {
  static consume_email_events (){
    const Consumer = kafka.Consumer
    const client = new kafka.KafkaClient(Env.get('KAFKA_SERVER'))
    const consumer = new Consumer(client,[{ topic: 'sampleTopic'}],{autoCommit: false})
    consumer.on('message', function (msg) {
      console.log("inside consumer")
      console.log(msg) 
    })
    consumer.on('error', function (err) {
      console.log('Error:',err);
    })
    consumer.on('offsetOutOfRange', function (err) {
      console.log('offsetOutOfRange:',err);
    })
  }
}

module.exports = Notification

For a producer to publish the message, we need to add the following method at the required trigger, here we are using an afterCreate hook of user model to publish a message

UserHook.triggerNotifications = async (userInstance) => {
  Notification.produce_event('sampleTopic','New user created')
}

Here produce_event() is another static method added to the Notification class as follows - 

static produce_event (topic,messages){
  const Producer = kafka.Producer
  const client = new kafka.KafkaClient(Env.get('KAFKA_SERVER'))
  const producer = new Producer(client)
  let payloads = [{
      topic: topic,
      messages: messages
  }] 
  producer.on('ready', function () {
   let push_status = producer.send(payloads, (err, data) => {
    if (err) {
      console.log('[kafka-producer -> '+topic+']: broker failed to update')
    }
    else {
      console.log('[kafka-producer -> '+topic+']: broker updated successfully')
    }
   })
  })
  producer.on('error', function (err) {
    console.log('Producer is in error state')
    console.log(err)
  })
}

To execute the command with the startup of an adonis application, configure the server.js file with preloading the startConsumer command. The Start consumer command is set inside the kafka.js defined in the start folder of the application as follows 

const Consumer = use('Consumer')
Consumer.exec()

The consumer is defined inside the app.js as an alias

const aliases = {
  Consumer: 'App/Commands/StartConsumer'
}

and, the startConsumer command is registered as 

const commands = ['App/Commands/StartConsumer']

Lastly, preload the Kafka with start of adonis application

new Ignitor(require('@adonisjs/fold'))
  .appRoot(__dirname)
  .preLoad('start/kafka')
  .fireHttpServer()
  .catch(console.error)

As and when a user is created, the consumer would log the message

New user created

Happy Coding :)

In case of queries, please feel free to drop a comment.