Boosting GORM Potential with Plugin via Callbacks: A Guide

Boosting GORM Potential with Plugin via Callbacks: A Guide

·

5 min read

GORM, the fantastic Go language ORM (Object-Relational Mapping) library, provides a robust framework for interacting with databases. With its extensible architecture, developers can enhance its capabilities through plugins. In this blog, we'll delve into the creation of a plugin for GORM that triggers SQS (Simple Queue Service) events upon database updates.

Introduction to SQSPlugin

The SQSPlugin is designed to seamlessly integrate with GORM and AWS SQS. It automates the process of triggering SQS messages whenever a database update occurs, providing a powerful mechanism for real-time event-driven architectures.

Note:GORM V2 unlocks enhanced capabilities compared to its predecessor, V1. Unlike V1, which lacks built-in functionality for post-commit actions, V2 empowers developers to extend its capabilities through custom plugins. Let's explore how you can leverage GORM plugins with callbacks to enrich your database interactions and streamline post-commit operations.

Key Features

  1. Automatic Triggering: SQSPlugin registers callbacks with GORM to automatically trigger SQS messages before and after database updates.

  2. Data Comparison: It intelligently compares old and new data to determine if an SQS message should be triggered, optimising resource usage.

  3. Concurrency-safe: Utilises mutex locks to ensure concurrency safety, preventing race conditions during database operations.

Plugin Implementation

The SQSPlugin consists of two main components: beforeUpdate and afterUpdate callbacks.

// SQSPlugin is a GORM plugin for triggering SQS messages on updates
type SQSPlugin struct {
    mu           sync.Mutex
    SQSClient    *sqs.SQS
    MysqlSess    *gorm.DB
    QueueURI     *string // Replace with your actual SQS Queue URL
    SQSMessage   *string // Replace with your actual SQS message structure
    OldData      interface{}
    NewData      interface{}
    TriggerEvent bool
}
  • SQSPlugin is a struct representing the plugin. It contains fields for mutex, SQS client, MySQL session, SQS queue URL, SQS message, old and new data, and a flag to trigger events.
func (p *SQSPlugin) Update(value interface{}, column string, query string, args ...interface{}) error {
    // Use a mutex to ensure exclusive access to the database
    p.mu.Lock()
    defer p.mu.Unlock()

    // Register the plugin's callback
    db := p.MysqlSess
    db.Callback().Update().Before("gorm:before_update").Register("before_update", func(d *gorm.DB) { p.beforeUpdate(d, column) })
    db.Callback().Update().After("gorm:after_update").Register("after_update", p.afterUpdate)

    // Perform the update
    if err := db.Where(query, args...).Updates(value).Error; err != nil {
        return err
    }

    return nil
}
  • Update method updates records in the database, registers beforeUpdate and afterUpdate callbacks, and triggers SQS messages based on updates.
func (p *SQSPlugin) Save(value interface{}, column string) error {
    // Use a mutex to ensure exclusive access to the database
    p.mu.Lock()
    defer p.mu.Unlock()

    // Register the plugin's callback
    db := p.MysqlSess
    db.Callback().Update().Before("gorm:before_update").Register("before_update", func(d *gorm.DB) { p.beforeUpdate(d, column) })
    db.Callback().Update().After("gorm:after_update").Register("after_update", p.afterUpdate)

    // Perform the save
    if err := db.Save(value).Error; err != nil {
        return err
    }
    return nil
}
  • Save method saves records in the database, registers beforeUpdate and afterUpdate callbacks, and triggers SQS messages based on saves.
// beforeUpdate is the callback function to be executed before an update operation
func (p *SQSPlugin) beforeUpdate(db *gorm.DB, columnName string) {
    fmt.Println("Before update callback triggered. Finding existing data...")

    // Debugging: Print the SQL query before execution
    db = db.Debug()

    // Query and store the existing data
    if err := db.First(p.OldData).Error; err != nil {
        // Handle the error if necessary
        fmt.Println("Error finding existing data:", err)
        return
    } else {
        fmt.Println("Existing data found:", p.OldData)
    }

    updatedData := db.Statement.Dest
    fmt.Println("Updated data:", updatedData)
    jsonData, err := json.Marshal(updatedData)
    if err != nil {
        fmt.Println("Error marshaling data to JSON:", err)
        return
    }

    if err := json.Unmarshal(jsonData, &p.NewData); err != nil {
        fmt.Println("Error unmarshaling JSON to NewData:", err)
        return
    }

    // Use reflection to dynamically access the field value based on the provided column name
    oldValueField := reflect.ValueOf(p.OldData).Elem().FieldByName(columnName)
    newValueField := reflect.ValueOf(p.NewData).Elem().FieldByName(columnName)

    // Check if the field is valid
    if !oldValueField.IsValid() || !newValueField.IsValid() {
        fmt.Println("Invalid column name:", columnName)
        return
    }

    // Check if the field is exportable (i.e., if it starts with an uppercase letter)
    if oldValueField.CanInterface() && newValueField.CanInterface() {
        oldValue := oldValueField.Interface()
        newValue := newValueField.Interface()

        // Check if the types are comparable
        if !reflect.TypeOf(oldValue).Comparable() || !reflect.TypeOf(newValue).Comparable() {
            fmt.Println("Non-comparable field type:", columnName)
            return
        }

        // Example condition to trigger an event if any change in the provided column
        if !reflect.DeepEqual(newValue, oldValue) {
            p.TriggerEvent = true
        } else {
            p.TriggerEvent = false
        }
    } else {
        fmt.Println("Non-exported field:", columnName)
    }
}
  • beforeUpdate is a callback function executed before an update operation. It fetches existing data, compares it with the updated data, and determines whether to trigger an event.
// afterUpdate is the callback function to be executed after an update operation
func (p *SQSPlugin) afterUpdate(db *gorm.DB) {
    fmt.Println("After update callback triggered. Pushing message to SQS...")

    // Debugging: Print the SQL query before execution
    _ = db.Debug()

    if p.TriggerEvent {
        _, err := p.SQSClient.SendMessage(&sqs.SendMessageInput{
            MessageBody: p.SQSMessage,
            QueueUrl:    p.QueueURI,
        })
        if err != nil {
            fmt.Println("Failed to publish message to SQS:", err)
            return
        }
        fmt.Println("Message successfully published to SQS.")
    }
}
  • afterUpdate is a callback function executed after an update operation. It pushes a message to the SQS queue if a relevant update is detected.

Example Usage: Consider a scenario where triggering an SQS event is desired whenever a specific column is updated in our database. We can initialise the SQSPlugin with the requisite configurations as follows:

// Initialize SQSPlugin with necessary configurations
plugin := &SQSPlugin{
    SQSClient: sqs.New(mySession),
    MysqlSess: db,
    QueueURI:  aws.String("your-sqs-queue-url"),
    SQSMessage: aws.String("your-sqs-message"),
}
plugin.Update(data,"name", " email = ?, phone_number = ?", singhrohanKumar7@gmail.com, +919113769631)

Conclusion

By leveraging the power of GORM and AWS SQS, developers can create highly scalable and responsive applications. The SQSPlugin exemplifies seamless integration between GORM and SQS, enabling the construction of event-driven architectures with ease.

Additionally, GORM offers the flexibility to utilize callbacks to trigger events after various database operations, including commit, create, delete, and save, further enhancing the plugin's capabilities.

With plugins like SQSPlugin, GORM solidifies its position as a preferred choice for developers seeking efficiency and flexibility in Go-based database interactions.

GitHub Repository:SQSPlugin

Feel free to explore and contribute to the SQSPlugin repository!