GORM, the fantastic Go language ORM (Object-Relational Mapping) library, provides a robust framework for interacting with databases. With its extensible architecture, developers can enhance its capabilities through plugins. In this blog, we'll delve into the creation of a plugin for GORM that triggers SQS (Simple Queue Service) events upon database updates.
Introduction to SQSPlugin
The SQSPlugin is designed to seamlessly integrate with GORM and AWS SQS. It automates the process of triggering SQS messages whenever a database update occurs, providing a powerful mechanism for real-time event-driven architectures.
Note:GORM V2 unlocks enhanced capabilities compared to its predecessor, V1. Unlike V1, which lacks built-in functionality for post-commit actions, V2 empowers developers to extend its capabilities through custom plugins. Let's explore how you can leverage GORM plugins with callbacks to enrich your database interactions and streamline post-commit operations.
Key Features
Automatic Triggering: SQSPlugin registers callbacks with GORM to automatically trigger SQS messages before and after database updates.
Data Comparison: It intelligently compares old and new data to determine if an SQS message should be triggered, optimising resource usage.
Concurrency-safe: Utilises mutex locks to ensure concurrency safety, preventing race conditions during database operations.
Plugin Implementation
The SQSPlugin consists of two main components: beforeUpdate and afterUpdate callbacks.
// SQSPlugin is a GORM plugin for triggering SQS messages on updates
type SQSPlugin struct {
mu sync.Mutex
SQSClient *sqs.SQS
MysqlSess *gorm.DB
QueueURI *string // Replace with your actual SQS Queue URL
SQSMessage *string // Replace with your actual SQS message structure
OldData interface{}
NewData interface{}
TriggerEvent bool
}
SQSPlugin
is a struct representing the plugin. It contains fields for mutex, SQS client, MySQL session, SQS queue URL, SQS message, old and new data, and a flag to trigger events.
func (p *SQSPlugin) Update(value interface{}, column string, query string, args ...interface{}) error {
// Use a mutex to ensure exclusive access to the database
p.mu.Lock()
defer p.mu.Unlock()
// Register the plugin's callback
db := p.MysqlSess
db.Callback().Update().Before("gorm:before_update").Register("before_update", func(d *gorm.DB) { p.beforeUpdate(d, column) })
db.Callback().Update().After("gorm:after_update").Register("after_update", p.afterUpdate)
// Perform the update
if err := db.Where(query, args...).Updates(value).Error; err != nil {
return err
}
return nil
}
Update
method updates records in the database, registersbeforeUpdate
andafterUpdate
callbacks, and triggers SQS messages based on updates.
func (p *SQSPlugin) Save(value interface{}, column string) error {
// Use a mutex to ensure exclusive access to the database
p.mu.Lock()
defer p.mu.Unlock()
// Register the plugin's callback
db := p.MysqlSess
db.Callback().Update().Before("gorm:before_update").Register("before_update", func(d *gorm.DB) { p.beforeUpdate(d, column) })
db.Callback().Update().After("gorm:after_update").Register("after_update", p.afterUpdate)
// Perform the save
if err := db.Save(value).Error; err != nil {
return err
}
return nil
}
Save
method saves records in the database, registersbeforeUpdate
andafterUpdate
callbacks, and triggers SQS messages based on saves.
// beforeUpdate is the callback function to be executed before an update operation
func (p *SQSPlugin) beforeUpdate(db *gorm.DB, columnName string) {
fmt.Println("Before update callback triggered. Finding existing data...")
// Debugging: Print the SQL query before execution
db = db.Debug()
// Query and store the existing data
if err := db.First(p.OldData).Error; err != nil {
// Handle the error if necessary
fmt.Println("Error finding existing data:", err)
return
} else {
fmt.Println("Existing data found:", p.OldData)
}
updatedData := db.Statement.Dest
fmt.Println("Updated data:", updatedData)
jsonData, err := json.Marshal(updatedData)
if err != nil {
fmt.Println("Error marshaling data to JSON:", err)
return
}
if err := json.Unmarshal(jsonData, &p.NewData); err != nil {
fmt.Println("Error unmarshaling JSON to NewData:", err)
return
}
// Use reflection to dynamically access the field value based on the provided column name
oldValueField := reflect.ValueOf(p.OldData).Elem().FieldByName(columnName)
newValueField := reflect.ValueOf(p.NewData).Elem().FieldByName(columnName)
// Check if the field is valid
if !oldValueField.IsValid() || !newValueField.IsValid() {
fmt.Println("Invalid column name:", columnName)
return
}
// Check if the field is exportable (i.e., if it starts with an uppercase letter)
if oldValueField.CanInterface() && newValueField.CanInterface() {
oldValue := oldValueField.Interface()
newValue := newValueField.Interface()
// Check if the types are comparable
if !reflect.TypeOf(oldValue).Comparable() || !reflect.TypeOf(newValue).Comparable() {
fmt.Println("Non-comparable field type:", columnName)
return
}
// Example condition to trigger an event if any change in the provided column
if !reflect.DeepEqual(newValue, oldValue) {
p.TriggerEvent = true
} else {
p.TriggerEvent = false
}
} else {
fmt.Println("Non-exported field:", columnName)
}
}
beforeUpdate
is a callback function executed before an update operation. It fetches existing data, compares it with the updated data, and determines whether to trigger an event.
// afterUpdate is the callback function to be executed after an update operation
func (p *SQSPlugin) afterUpdate(db *gorm.DB) {
fmt.Println("After update callback triggered. Pushing message to SQS...")
// Debugging: Print the SQL query before execution
_ = db.Debug()
if p.TriggerEvent {
_, err := p.SQSClient.SendMessage(&sqs.SendMessageInput{
MessageBody: p.SQSMessage,
QueueUrl: p.QueueURI,
})
if err != nil {
fmt.Println("Failed to publish message to SQS:", err)
return
}
fmt.Println("Message successfully published to SQS.")
}
}
afterUpdate
is a callback function executed after an update operation. It pushes a message to the SQS queue if a relevant update is detected.
Example Usage: Consider a scenario where triggering an SQS event is desired whenever a specific column is updated in our database. We can initialise the SQSPlugin with the requisite configurations as follows:
// Initialize SQSPlugin with necessary configurations
plugin := &SQSPlugin{
SQSClient: sqs.New(mySession),
MysqlSess: db,
QueueURI: aws.String("your-sqs-queue-url"),
SQSMessage: aws.String("your-sqs-message"),
}
plugin.Update(data,"name", " email = ?, phone_number = ?", singhrohanKumar7@gmail.com, +919113769631)
Conclusion
By leveraging the power of GORM and AWS SQS, developers can create highly scalable and responsive applications. The SQSPlugin exemplifies seamless integration between GORM and SQS, enabling the construction of event-driven architectures with ease.
Additionally, GORM offers the flexibility to utilize callbacks to trigger events after various database operations, including commit, create, delete, and save, further enhancing the plugin's capabilities.
With plugins like SQSPlugin, GORM solidifies its position as a preferred choice for developers seeking efficiency and flexibility in Go-based database interactions.
GitHub Repository:SQSPlugin
Feel free to explore and contribute to the SQSPlugin repository!