ℹ️ disclaimer
This article represented my mental model at the time of writing, but I’m always iterating on it.
Delivery guarantee for events
When application saves some information about entity to data store and then propogates this changes to a broker (RabbitMQ, Kafka, …) there are two transactions:
- between application and data store
- between application and broker
So, in case of application failure, network problem between application and a broker or by any other outage, this changes may not be sent to the broker (change will be saved in data store, but not propogated to broker).
In code this problem might look something like this:
fun changeUserAddress(userId: String, newAddress: String) {
... todo
dataStore.persist(user)
publisher.publish(user)
}
Transactional outbox pattern
So that you could deliver your events (changes) to broker with guarantee you have options to use Transactional outbox pattern.
The main idea of this pattern is to use only one transaction - data store transaction:
- persist entity (in entity_table)
- persist event for propagation (in delivery_table)
In this single transaction we can achieve atomic behaviour regarding data store. And then another application’s background process can read this delivery_table
(pull it) and send the event to the broker eventually (with at least one semantic).
To reduce latency for delivering events (background process brings some latency to publishing) to the broker, application should try sending the event after data store transactions is successfully applied and remove this event from delivery_table
in another data store independent transaction (all exceptions regarding sendind event should not affect main business flow, in case of failure background process will eventually re-send it).
Pattern actions:
- persist entity and events in a single data store transaction
- try sending event in safe mode in main business flow, in case of success - remove this event from
delivery_table
in another transaction - in case of failure (2), background process will eventually re-send it
fun changeUserAddress(userId: String, newAddress: String) {
... todo
transaction {
dataStore.persist(user)
dataStore.persist(event)
}
var eventId = event.id
try {
publisher.publish(event)
dataStore.remove(eventId)
} catch (e: Exception) {
logger.debug("Could not send event ${event}, background process will send it later")
}
}
What if database doesn’t support transactions (NoSQL like):
If data store does not support transactions, you can achieve transactions behaviour appending events into entity payload:
{
"user_id: "usr-1",
"payload": {
"user_id: "usr-1",
"first_name": "John",
"last_name": "Wick",
"version": 7,
...
},
"events": [
{
"event_id": "1",
...
},
{
"event_id": "2",
...
}
]
}
Where:
user_id
- entity idpayload
- entity bodyevents
- events to publish eventually
Now, to access this events and publish them we have several ways:
- read
events
array in application’s background process and use optimistic locks to clean it up (keep in mind,events
array might be appended in next business transactions, so there is a change of race conditions, that’s why it’s recommened to use optimistic locks here) - use CDC approach (recommended way)
CDC (Change Data Capture) stream
Change data capture (CDC) refers to the tracking of all changes in a data source (databases, data warehouses, etc.) so they can be captured in destination systems. In short, CDC allows organizations to achieve data integrity and consistency across all systems and deployment environments.
It means when we insert, update or remove entities in data store, we can subscribe to all data store’s changes events. So, we can capture this CDC events in application’s background process, extract events
array and publish it to the broker.
as example, MongoDB has
watch
driver method to get CDC events stream to handle.
Conclusion
Transactional outbox pattern works well to overcome distributed transactions in your app, it helps to build more robust and resistent to failures applications.