Integrating RabbitMQ consumers for automatic content creation

System Information
  • Strapi Version: 4.3.6
  • Operating System: Ubuntu 20.04.05
  • Database: MongoDB 5.0.6
  • Node Version: 16.18.1
  • NPM Version: 8.19.2
  • Yarn Version: N/A

i need to implement a rabbitMQ Consumer inside Strapi to read in messages and perform actions accordingly. usually i would just add some api endpoints to trigger the process, but in this case this would lead to a lot of problems with other systems.

the process i need to start includes loading data from multiple services, combining them and generating collection type entries with said data.
this process of synchronizing/creating/deleting entities can be initiated by multiple other services, which provide parts of the necessary data.

the case i try to cover using rabbitmq is the following:
systems A, B and C all need to start the process in a short span of time (around the range of 1 minute), all of them would trigger a sync inside strapi, which in turn would request data from all the systems and start generating/updating entities (at the current point around 1700 pages). to not brick strapi and the other services with high amounts of requests and data processing, i put a rabbit queue between strapi and the other system. the queue deduplicates messages so only one sync is triggered, when multiple system request a sync at the same time and i can lock the sync process in strapi to only have a single instance running at all times.

how would i add a consumer to strapi, that’s always running in the backend?

TIA

For anyone facing the same issue in the future:
i took some inspiration from this Strapi and Nats Streaming Server (pub/sub messaging)
and came up with a solution to the consumer part

in app/src/index.js

'use strict';

const rabbitWrapper = require('../config/customLib/rabbit-wrapper');
module.exports = {
  register(/*{ strapi }*/) {
  },
  bootstrap({strapi}) {
    process.nextTick(async () => {
      await rabbitWrapper.connect(strapi);
      await rabbitWrapper.consume((msg) => {
        //message processing logic goes here
        console.log('i got a message for you:', msg);
      });
    });
  },
};

app/config/customLib/rabbit-wrapper.js

const amqp = require('amqplib');

class RabbitWrapper {
  _queue;
  _channel;

  async consume(fn) {
    await this._channel.consume(this._queue, async function (msg) {
      await rabbitWrapper.processMessage(fn, msg);
    });
  }

  async processMessage(fn, msg) {
    try {
      await fn(msg);
      this._channel.ack(msg);
    } catch (err) {
      console.log('consuming message failed: ',msg);
      this._channel.nack(msg);
    }
  }

  async connect(strapi) {
    const user = strapi.config.get('api.rabbit.user');
    const pass = strapi.config.get('api.rabbit.pass');
    const host = strapi.config.get('api.rabbit.host');

    const client = await amqp.connect(`amqp://${user}:${pass}@${host}`);

    this._queue = strapi.config.get('api.rabbit.queue');
    const ch = await client.createChannel();

    await ch.assertQueue(this._queue, {
      //options
    });
    this._channel = ch;
    console.log('established connection');
  }
}

const rabbitWrapper = new RabbitWrapper();
module.exports = rabbitWrapper;

app/config/api.js

module.exports = ({env}) => ({
  default: {
    //other env stuff
  },
/** more env stuff */
  rabbit: {
    host: env('RABBIT_HOST'),
    user: env('RABBIT_USER'),
    pass: env('RABBIT_PASS'),
    queue: env('RABBIT_QUEUE'),
  }
});

hope this helps someone in the future researching this

1 Like