For anyone facing the same issue in the future:
i took some inspiration from this Strapi and Nats Streaming Server (pub/sub messaging)
and came up with a solution to the consumer part
in app/src/index.js
'use strict';
const rabbitWrapper = require('../config/customLib/rabbit-wrapper');
module.exports = {
register(/*{ strapi }*/) {
},
bootstrap({strapi}) {
process.nextTick(async () => {
await rabbitWrapper.connect(strapi);
await rabbitWrapper.consume((msg) => {
//message processing logic goes here
console.log('i got a message for you:', msg);
});
});
},
};
app/config/customLib/rabbit-wrapper.js
const amqp = require('amqplib');
class RabbitWrapper {
_queue;
_channel;
async consume(fn) {
await this._channel.consume(this._queue, async function (msg) {
await rabbitWrapper.processMessage(fn, msg);
});
}
async processMessage(fn, msg) {
try {
await fn(msg);
this._channel.ack(msg);
} catch (err) {
console.log('consuming message failed: ',msg);
this._channel.nack(msg);
}
}
async connect(strapi) {
const user = strapi.config.get('api.rabbit.user');
const pass = strapi.config.get('api.rabbit.pass');
const host = strapi.config.get('api.rabbit.host');
const client = await amqp.connect(`amqp://${user}:${pass}@${host}`);
this._queue = strapi.config.get('api.rabbit.queue');
const ch = await client.createChannel();
await ch.assertQueue(this._queue, {
//options
});
this._channel = ch;
console.log('established connection');
}
}
const rabbitWrapper = new RabbitWrapper();
module.exports = rabbitWrapper;
app/config/api.js
module.exports = ({env}) => ({
default: {
//other env stuff
},
/** more env stuff */
rabbit: {
host: env('RABBIT_HOST'),
user: env('RABBIT_USER'),
pass: env('RABBIT_PASS'),
queue: env('RABBIT_QUEUE'),
}
});
hope this helps someone in the future researching this