fix: cassandra + mongo + arangodb issues with webhook
This commit is contained in:
@@ -143,6 +143,11 @@ func NewProvider() (*provider, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sessionIndexQuery := fmt.Sprintf("CREATE INDEX IF NOT EXISTS authorizer_session_user_id ON %s.%s (user_id)", KeySpace, models.Collections.Session)
|
||||
err = session.Query(sessionIndexQuery).Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
userCollectionQuery := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text, email text, email_verified_at bigint, password text, signup_methods text, given_name text, family_name text, middle_name text, nickname text, gender text, birthdate text, phone_number text, phone_number_verified_at bigint, picture text, roles text, updated_at bigint, created_at bigint, revoked_timestamp bigint, PRIMARY KEY (id))", KeySpace, models.Collections.User)
|
||||
err = session.Query(userCollectionQuery).Exec()
|
||||
@@ -177,7 +182,7 @@ func NewProvider() (*provider, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
webhookCollectionQuery := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text, event_name text, endpoint text, enabled boolean, updated_at bigint, created_at bigint, PRIMARY KEY (id))", KeySpace, models.Collections.Webhook)
|
||||
webhookCollectionQuery := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text, event_name text, endpoint text, enabled boolean, headers text, updated_at bigint, created_at bigint, PRIMARY KEY (id))", KeySpace, models.Collections.Webhook)
|
||||
err = session.Query(webhookCollectionQuery).Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -102,6 +102,10 @@ func (p *provider) UpdateUser(ctx context.Context, user models.User) (models.Use
|
||||
continue
|
||||
}
|
||||
|
||||
if key == "_key" {
|
||||
continue
|
||||
}
|
||||
|
||||
if value == nil {
|
||||
updateFields += fmt.Sprintf("%s = null,", key)
|
||||
continue
|
||||
@@ -135,7 +139,19 @@ func (p *provider) DeleteUser(ctx context.Context, user models.User) error {
|
||||
return err
|
||||
}
|
||||
|
||||
deleteSessionQuery := fmt.Sprintf("DELETE FROM %s WHERE user_id = '%s'", KeySpace+"."+models.Collections.Session, user.ID)
|
||||
getSessionsQuery := fmt.Sprintf("SELECT id FROM %s WHERE user_id = '%s' ALLOW FILTERING", KeySpace+"."+models.Collections.Session, user.ID)
|
||||
scanner := p.db.Query(getSessionsQuery).Iter().Scanner()
|
||||
sessionIDs := ""
|
||||
for scanner.Next() {
|
||||
var wlID string
|
||||
err = scanner.Scan(&wlID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sessionIDs += fmt.Sprintf("'%s',", wlID)
|
||||
}
|
||||
sessionIDs = strings.TrimSuffix(sessionIDs, ",")
|
||||
deleteSessionQuery := fmt.Sprintf("DELETE FROM %s WHERE id IN (%s)", KeySpace+"."+models.Collections.Session, sessionIDs)
|
||||
err = p.db.Query(deleteSessionQuery).Exec()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -181,7 +197,7 @@ func (p *provider) ListUsers(ctx context.Context, pagination model.Pagination) (
|
||||
// GetUserByEmail to get user information from database using email address
|
||||
func (p *provider) GetUserByEmail(ctx context.Context, email string) (models.User, error) {
|
||||
var user models.User
|
||||
query := fmt.Sprintf("SELECT id, email, email_verified_at, password, signup_methods, given_name, family_name, middle_name, nickname, birthdate, phone_number, phone_number_verified_at, picture, roles, revoked_timestamp, created_at, updated_at FROM %s WHERE email = '%s' LIMIT 1", KeySpace+"."+models.Collections.User, email)
|
||||
query := fmt.Sprintf("SELECT id, email, email_verified_at, password, signup_methods, given_name, family_name, middle_name, nickname, birthdate, phone_number, phone_number_verified_at, picture, roles, revoked_timestamp, created_at, updated_at FROM %s WHERE email = '%s' LIMIT 1 ALLOW FILTERING", KeySpace+"."+models.Collections.User, email)
|
||||
err := p.db.Query(query).Consistency(gocql.One).Scan(&user.ID, &user.Email, &user.EmailVerifiedAt, &user.Password, &user.SignupMethods, &user.GivenName, &user.FamilyName, &user.MiddleName, &user.Nickname, &user.Birthdate, &user.PhoneNumber, &user.PhoneNumberVerifiedAt, &user.Picture, &user.Roles, &user.RevokedTimestamp, &user.CreatedAt, &user.UpdatedAt)
|
||||
if err != nil {
|
||||
return user, err
|
||||
|
@@ -24,6 +24,11 @@ func (p *provider) AddWebhook(ctx context.Context, webhook models.Webhook) (*mod
|
||||
webhook.CreatedAt = time.Now().Unix()
|
||||
webhook.UpdatedAt = time.Now().Unix()
|
||||
|
||||
existingHook, _ := p.GetWebhookByEventName(ctx, webhook.EventName)
|
||||
if existingHook != nil {
|
||||
return nil, fmt.Errorf("Webhook with %s event_name already exists", webhook.EventName)
|
||||
}
|
||||
|
||||
insertQuery := fmt.Sprintf("INSERT INTO %s (id, event_name, endpoint, headers, enabled, created_at, updated_at) VALUES ('%s', '%s', '%s', '%s', %t, %d, %d)", KeySpace+"."+models.Collections.Webhook, webhook.ID, webhook.EventName, webhook.EndPoint, webhook.Headers, webhook.Enabled, webhook.CreatedAt, webhook.UpdatedAt)
|
||||
err := p.db.Query(insertQuery).Exec()
|
||||
if err != nil {
|
||||
@@ -56,6 +61,10 @@ func (p *provider) UpdateWebhook(ctx context.Context, webhook models.Webhook) (*
|
||||
continue
|
||||
}
|
||||
|
||||
if key == "_key" {
|
||||
continue
|
||||
}
|
||||
|
||||
if value == nil {
|
||||
updateFields += fmt.Sprintf("%s = null,", key)
|
||||
continue
|
||||
@@ -72,7 +81,6 @@ func (p *provider) UpdateWebhook(ctx context.Context, webhook models.Webhook) (*
|
||||
updateFields = strings.TrimSuffix(updateFields, ",")
|
||||
|
||||
query := fmt.Sprintf("UPDATE %s SET %s WHERE id = '%s'", KeySpace+"."+models.Collections.Webhook, updateFields, webhook.ID)
|
||||
|
||||
err = p.db.Query(query).Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -130,7 +138,7 @@ func (p *provider) GetWebhookByID(ctx context.Context, webhookID string) (*model
|
||||
// GetWebhookByEventName to get webhook by event_name
|
||||
func (p *provider) GetWebhookByEventName(ctx context.Context, eventName string) (*model.Webhook, error) {
|
||||
var webhook models.Webhook
|
||||
query := fmt.Sprintf(`SELECT id, event_name, endpoint, headers, enabled, created_at, updated_at FROM %s WHERE event_name = '%s' LIMIT 1`, KeySpace+"."+models.Collections.Webhook, eventName)
|
||||
query := fmt.Sprintf(`SELECT id, event_name, endpoint, headers, enabled, created_at, updated_at FROM %s WHERE event_name = '%s' LIMIT 1 ALLOW FILTERING`, KeySpace+"."+models.Collections.Webhook, eventName)
|
||||
err := p.db.Query(query).Consistency(gocql.One).Scan(&webhook.ID, &webhook.EventName, &webhook.EndPoint, &webhook.Headers, &webhook.Enabled, &webhook.CreatedAt, &webhook.UpdatedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -146,7 +154,19 @@ func (p *provider) DeleteWebhook(ctx context.Context, webhook *model.Webhook) er
|
||||
return err
|
||||
}
|
||||
|
||||
query = fmt.Sprintf("DELETE FROM %s WHERE webhook_id = '%s'", KeySpace+"."+models.Collections.WebhookLog, webhook.ID)
|
||||
getWebhookLogQuery := fmt.Sprintf("SELECT id FROM %s WHERE webhook_id = '%s' ALLOW FILTERING", KeySpace+"."+models.Collections.WebhookLog, webhook.ID)
|
||||
scanner := p.db.Query(getWebhookLogQuery).Iter().Scanner()
|
||||
webhookLogIDs := ""
|
||||
for scanner.Next() {
|
||||
var wlID string
|
||||
err = scanner.Scan(&wlID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
webhookLogIDs += fmt.Sprintf("'%s',", wlID)
|
||||
}
|
||||
webhookLogIDs = strings.TrimSuffix(webhookLogIDs, ",")
|
||||
query = fmt.Sprintf("DELETE FROM %s WHERE id IN (%s)", KeySpace+"."+models.Collections.WebhookLog, webhookLogIDs)
|
||||
err = p.db.Query(query).Exec()
|
||||
return err
|
||||
}
|
||||
|
@@ -40,8 +40,8 @@ func (p *provider) ListWebhookLogs(ctx context.Context, pagination model.Paginat
|
||||
query := fmt.Sprintf("SELECT id, http_status, response, request, webhook_id, created_at, updated_at FROM %s LIMIT %d", KeySpace+"."+models.Collections.WebhookLog, pagination.Limit+pagination.Offset)
|
||||
|
||||
if webhookID != "" {
|
||||
totalCountQuery = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE webhook_id='%s'`, KeySpace+"."+models.Collections.WebhookLog, webhookID)
|
||||
query = fmt.Sprintf("SELECT id, http_status, response, request, webhook_id, created_at, updated_at FROM %s WHERE webhook_id = '%s' LIMIT %d", KeySpace+"."+models.Collections.WebhookLog, webhookID, pagination.Limit+pagination.Offset)
|
||||
totalCountQuery = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE webhook_id='%s' ALLOW FILTERING`, KeySpace+"."+models.Collections.WebhookLog, webhookID)
|
||||
query = fmt.Sprintf("SELECT id, http_status, response, request, webhook_id, created_at, updated_at FROM %s WHERE webhook_id = '%s' LIMIT %d ALLOW FILTERING", KeySpace+"."+models.Collections.WebhookLog, webhookID, pagination.Limit+pagination.Offset)
|
||||
}
|
||||
|
||||
err := p.db.Query(totalCountQuery).Consistency(gocql.One).Scan(&paginationClone.Total)
|
||||
|
Reference in New Issue
Block a user