feat: add cache clear
This commit is contained in:
parent
02c0ebb9c4
commit
428a0be3db
|
@ -2,6 +2,7 @@ package stores
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,6 +11,8 @@ import (
|
||||||
const (
|
const (
|
||||||
// Maximum entries to keep in session storage
|
// Maximum entries to keep in session storage
|
||||||
maxCacheSize = 1000
|
maxCacheSize = 1000
|
||||||
|
// Cache clear interval
|
||||||
|
clearInterval = 10 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// SessionEntry is the struct for entry stored in store
|
// SessionEntry is the struct for entry stored in store
|
||||||
|
@ -20,30 +23,64 @@ type SessionEntry struct {
|
||||||
|
|
||||||
// SessionStore struct to store the env variables
|
// SessionStore struct to store the env variables
|
||||||
type SessionStore struct {
|
type SessionStore struct {
|
||||||
mutex sync.Mutex
|
wg sync.WaitGroup
|
||||||
|
mutex sync.RWMutex
|
||||||
store map[string]*SessionEntry
|
store map[string]*SessionEntry
|
||||||
itemsToEvict []string
|
// stores expireTime: key to remove data when cache is full
|
||||||
|
// map is sorted by key so older most entry can be deleted first
|
||||||
|
keyIndex map[int64]string
|
||||||
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSessionStore create a new session store
|
// NewSessionStore create a new session store
|
||||||
func NewSessionStore() *SessionStore {
|
func NewSessionStore() *SessionStore {
|
||||||
return &SessionStore{
|
store := &SessionStore{
|
||||||
mutex: sync.Mutex{},
|
mutex: sync.RWMutex{},
|
||||||
store: make(map[string]*SessionEntry),
|
store: make(map[string]*SessionEntry),
|
||||||
|
keyIndex: make(map[int64]string),
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
}
|
||||||
|
store.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer store.wg.Done()
|
||||||
|
store.clean()
|
||||||
|
}()
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SessionStore) clean() {
|
||||||
|
t := time.NewTicker(clearInterval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.stop:
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
s.mutex.Lock()
|
||||||
|
currentTime := time.Now().Unix()
|
||||||
|
for k, v := range s.store {
|
||||||
|
if v.ExpiresAt < currentTime {
|
||||||
|
delete(s.store, k)
|
||||||
|
delete(s.keyIndex, v.ExpiresAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.mutex.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the value of the key in state store
|
// Get returns the value of the key in state store
|
||||||
func (s *SessionStore) Get(key, subKey string) string {
|
func (s *SessionStore) Get(key, subKey string) string {
|
||||||
s.mutex.Lock()
|
s.mutex.RLock()
|
||||||
defer s.mutex.Unlock()
|
defer s.mutex.RUnlock()
|
||||||
currentTime := time.Now().Unix()
|
currentTime := time.Now().Unix()
|
||||||
k := fmt.Sprintf("%s:%s", key, subKey)
|
k := fmt.Sprintf("%s:%s", key, subKey)
|
||||||
if v, ok := s.store[k]; ok {
|
if v, ok := s.store[k]; ok {
|
||||||
if v.ExpiresAt > currentTime {
|
if v.ExpiresAt > currentTime {
|
||||||
return v.Value
|
return v.Value
|
||||||
}
|
}
|
||||||
s.itemsToEvict = append(s.itemsToEvict, k)
|
// Delete expired items
|
||||||
|
delete(s.store, k)
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
@ -54,17 +91,25 @@ func (s *SessionStore) Set(key string, subKey, value string, expiration int64) {
|
||||||
defer s.mutex.Unlock()
|
defer s.mutex.Unlock()
|
||||||
k := fmt.Sprintf("%s:%s", key, subKey)
|
k := fmt.Sprintf("%s:%s", key, subKey)
|
||||||
if _, ok := s.store[k]; !ok {
|
if _, ok := s.store[k]; !ok {
|
||||||
s.store[k] = &SessionEntry{
|
// check if there is enough space in cache
|
||||||
Value: value,
|
// else delete entries based on FIFO
|
||||||
ExpiresAt: expiration,
|
if len(s.store) == maxCacheSize {
|
||||||
// TODO add expire time
|
// remove older most entry
|
||||||
|
sortedKeys := []int64{}
|
||||||
|
for ik := range s.keyIndex {
|
||||||
|
sortedKeys = append(sortedKeys, ik)
|
||||||
|
}
|
||||||
|
sort.Slice(sortedKeys, func(i, j int) bool { return sortedKeys[i] < sortedKeys[j] })
|
||||||
|
itemToRemove := sortedKeys[0]
|
||||||
|
delete(s.store, s.keyIndex[itemToRemove])
|
||||||
|
delete(s.keyIndex, itemToRemove)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.store[k] = &SessionEntry{
|
s.store[k] = &SessionEntry{
|
||||||
Value: value,
|
Value: value,
|
||||||
ExpiresAt: expiration,
|
ExpiresAt: expiration,
|
||||||
// TODO add expire time
|
|
||||||
}
|
}
|
||||||
|
s.keyIndex[expiration] = k
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveAll all values for given key
|
// RemoveAll all values for given key
|
||||||
|
|
|
@ -42,17 +42,14 @@ func (c *provider) GetUserSession(userId, key string) (string, error) {
|
||||||
func (c *provider) DeleteUserSession(userId, key string) error {
|
func (c *provider) DeleteUserSession(userId, key string) error {
|
||||||
if err := c.store.Del(c.ctx, fmt.Sprintf("%s:%s", userId, constants.TokenTypeSessionToken+"_"+key)).Err(); err != nil {
|
if err := c.store.Del(c.ctx, fmt.Sprintf("%s:%s", userId, constants.TokenTypeSessionToken+"_"+key)).Err(); err != nil {
|
||||||
log.Debug("Error deleting user session from redis: ", err)
|
log.Debug("Error deleting user session from redis: ", err)
|
||||||
fmt.Println("Error deleting user session from redis: ", err, userId, constants.TokenTypeSessionToken, key)
|
|
||||||
// continue
|
// continue
|
||||||
}
|
}
|
||||||
if err := c.store.Del(c.ctx, fmt.Sprintf("%s:%s", userId, constants.TokenTypeAccessToken+"_"+key)).Err(); err != nil {
|
if err := c.store.Del(c.ctx, fmt.Sprintf("%s:%s", userId, constants.TokenTypeAccessToken+"_"+key)).Err(); err != nil {
|
||||||
log.Debug("Error deleting user session from redis: ", err)
|
log.Debug("Error deleting user session from redis: ", err)
|
||||||
fmt.Println("Error deleting user session from redis: ", err, userId, constants.TokenTypeAccessToken, key)
|
|
||||||
// continue
|
// continue
|
||||||
}
|
}
|
||||||
if err := c.store.Del(c.ctx, fmt.Sprintf("%s:%s", userId, constants.TokenTypeRefreshToken+"_"+key)).Err(); err != nil {
|
if err := c.store.Del(c.ctx, fmt.Sprintf("%s:%s", userId, constants.TokenTypeRefreshToken+"_"+key)).Err(); err != nil {
|
||||||
log.Debug("Error deleting user session from redis: ", err)
|
log.Debug("Error deleting user session from redis: ", err)
|
||||||
fmt.Println("Error deleting user session from redis: ", err, userId, constants.TokenTypeRefreshToken, key)
|
|
||||||
// continue
|
// continue
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue
Block a user