Events And Messages¶
At Majority we try to be event driven. See Design Principles.
As of 2025-05 we use a mix of Azure Service Bus and RabbitMQ, with the goal of migrating over to only RabbitMQ. Until that time, the messagebus-config config can be used to control which messaging system to use.
RabbitMQ¶
RabbitMQ Management¶
Links to Management UI:
- https://stage-rabbitmq.minority.com/
- https://prod-rabbitmq.minority.com/
Connections & Channels¶
- Connections: Each instance/pod that connects to Rabbit should have 1 connection
- Channels: One Channel per queue per instance/pod that the service subscribes to. One additional Channel per instance/pod that is used to publish events.
Exchanges¶
- Two Exchanges per Event, one main, and then one for dead letters.
- The exchange name is the
eventNamegiven in theMessageAttribute:[Message("ignored in Rabbit", "Ignored in Rabbit", "<AreaName>.<NameOfClass>")] - The dead letter one is named after the main exchange with
_dlxsuffix.
Queues¶
- Two Queues per service that listens to an event. One main, and then one for dead letters.
- The queue is named as
<NameOfExchange>.<NameOfServiceListener>, i.e.,AccountEventRequest_Minority.Galileo.Service - The dead letter queue is named after the main queue with
_dlqsuffix (AccountEventRequest_Minority.Galileo.Service_dlq)
RabbitMQ Scheduling¶
Scheduling for RabbitMQ is implemented as a database table, platform.MessageOutbox in each area that schedules events.
Setup¶
To use scheduling for RabbitMQ, a few things are needed:
- The area that schedules events must have a database.
- In the area/service config file, it must have a "Default" connection string that will be used by the scheduling code.
- There's a config setting messagebus-config.UseRabbitMqForSchedule that decides if it will use the feature. (When used everywhere, this will be removed).
- The MessageOutboxModule must be loaded in startup. It's a good idea to do it just after loading the MinorityMessageBusModule.
Note: If you schedule an event and have set UseRabbitMqForSchedule but haven't done the full setup, you will get a runtime exception.
How it works¶
When publishing a scheduled event, the event will be inserted into the platform.MessageOutbox table.
A background job (MessageOutboxHostedService) runs in each instance that has the MessageOutboxModule loaded, and reads events that are due to be made available, and moves them to the RabbitMQ exchange that handles the event. Once published to RabbitMQ, the event is removed from the platform.MessageOutbox table.
- Publishing of scheduled events is done within a transaction, meaning it guarantees at-least-once publishing.
- The background publishing job will try its best to handle events as quickly as possible:
- The job will check the table every 60 seconds for events to be published.
- The job has logic so that if it sees an event that should be published sooner than after 60 seconds, it will run its logic at that time. (However, this shortcut is a best effort and not guaranteed.) - The job is sequential, that is, it will read one event at a time to publish and then the next one, etc. If performance is an issue, it can be scaled by having multiple instances (pods).
- Unless there are any exceptions, it will only log
EOUTevent and nothing else (e.g., noDBlogs for reading from the table). - When the DB migration script runs, it will create/update the table & SPs needed. By updating platform nugets, any new outbox migration scripts will be run as part of the normal DB migration.
Metrics¶
The outbox publishes a metric to Datadog called minority.outbox.messages. It can be grouped by message_name and is_delayed in addition to the normal tags like area-name. is_delayed as true counts events with a ScheduledTime in the past, false count when it is in the future.
Note that in order to query the total count, you need to take the max by (message_name, is_delayed, area_name), and then sum by whatever of those three you want to group by. E.g. sum(max:minority.outbox.messages{$env, $app-area} by {app-area,message_name,is_delayed}, { app-area })
Manually sending events¶
If you need to publish events to RabbitMQ, it's possible to use the MessageOutbox functionality. This option works great, especially when there are many events to publish.
The idea is to manually insert the events as scheduled events into the platform.MessageOutbox table. Then the service will publish them automatically.
This is a sample insert, that will publish a DatabaseDailyMaintenanceMessage as soon as possible.
insert into platform.MessageOutbox
(MessageId, MessageName, ContentType, MessageData, MessageProperties, Ruid, ScheduledTime)
select NEWID(), 'Platform.DatabaseDailyMaintenanceMessage', 'application/json',
'{"ScheduledOn":"2025-06-05T09:40:06.3345904Z"}', '{"X-Correlation-ID": "772b64e0-07d3-4b7c-a6c0-e24160d184f5"}',
'772b64e0-07d3-4b7c-a6c0-e24160d184f5', GETUTCDATE()
Columns:
- MessageId can be a random GUID. Make sure to keep it unique for each event.
- MessageName should be the RabbitMQ Exchange name (
eventNamein theMessageAttributeas described above). - MessageData is the JSON-serialized Event class, the body of the event. One easy way to get a starting point is to look at the
EINlogs for the event. - MessageProperties contains a bunch of properties. It can be an empty object, but it's best to put
X-Correlation-ID(which is theRUID) there to ensure the subscribing code uses the correct RUID. - Ruid should be the same RUID as used in MessageProperties.
- ScheduledTime is the time the event should be published. If you insert 1000s of events, it can be a good idea to spread them out over time to flatten the load on the system.
Additional notes:
- Any service which has the
MessageOutboxModuleloaded can publish any event regardless of whether it is the owner or normally publishes/subscribes to the event. - The event publishing code is not optimized to handle many events at the same time. Therefore, it's best to spread out the
ScheduledTimeover some time so that you do not block the table for any other events. E.g., if you put 50k events to be published at 13:00, and another event comes in to be published at 13:01, it will have to wait until all the 50k events are published. - When you publish events in this way, there will not be any
EOUTlogs, onlyEIN/EINR. - Column
MessageOutboxIdis a BIGINT and serves as the internal primary key for theMessageOutboxtable. ThisMessageOutboxIdis the value returned by themessagePublisher.ScheduleEventcall when scheduling an event programmatically. - Unless explicitly set, the
Sendercolumn will be populated with the current SQL login name. This is logged as EventSender in theEINlog.
Batch sending events¶
We've previously SendEventsToServiceBus job to send batches of events. This job is quite slow and requires using production credentials on dev machines.
With services running MessageOutBox we can us it to send messages much quicker. Testing indicates 5k/minute leaves some margin to avoid blocking regular message processing. Please test for your individual case as limits may be different, especially on the consumer end!
Example script to batch run events:
DECLARE
@CorrelationId VARCHAR(50) = '5fd8be7d-71d2-4389-8975-ac1889aa01ab',
@BatchSize INT = 5000,
@MaxBatch INT = 100, -- End batch, depends on source data, with 5k batch size this would be 100*5000 = 500 000 total items in this case. Test with a few first to determine performance in the specific case
@CurrentBatch INT = 0, -- Starting batch number, usually 0, but if you test a few manually you can change it to avoid sending duplicates
@CurrentDate DATETIME = GETUTCDATE() -- Starting date to schedule message, change to hard-coded to set a specific future date instead of beginning processing right away
WHILE @CurrentBatch <= @MaxBatch
BEGIN
INSERT INTO platform.MessageOutbox (MessageId, MessageName, ContentType, MessageData, MessageProperties, Ruid, ScheduledTime)
SELECT NEWID(), 'PhonePlan.UpdatePhonePlanAccessLevelMessage', 'application/json',
'{"UserId":"' + UserId + '","AccessLevel": "Full"}', '{"X-Correlation-ID": "' + @CorrelationId + '"}',
@CorrelationId, @CurrentDate FROM [temp_table]
ORDER BY UserId
OFFSET @CurrentBatch * @BatchSize ROWS
FETCH NEXT @BatchSize ROWS ONLY
SET @CurrentDate = DATEADD(MINUTE, 1, @CurrentDate)
SET @CurrentBatch = 1 + @CurrentBatch
END