Skip to content

Commit

Permalink
refactor custom code to get around import cycles
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrady committed Sep 6, 2024
1 parent 98129c7 commit 8e8f8e0
Show file tree
Hide file tree
Showing 18 changed files with 1,182 additions and 10 deletions.
19 changes: 19 additions & 0 deletions .fernignore
Original file line number Diff line number Diff line change
@@ -1 +1,20 @@
# Specify files that shouldn't be modified by Fern


buffer/*
cache/*
client/opts.go
client/schematic_client.go
client/*
client/schematic.go
flags/*
http/*
logger/*


generate.go
generate.sh
README.md


mocks/*
148 changes: 148 additions & 0 deletions buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package buffer

import (
"context"
"sync"
"time"

schematicgo "github.com/schematichq/schematic-go"
"github.com/schematichq/schematic-go/events"
)

const defaultEventBufferPeriod = 5 * time.Second
const maxEvents = 100

type eventBuffer struct {
// error logging channel
errors chan error

// buffer of events
events []*schematicgo.CreateEventRequestBody

// API client
eventClient *events.Client

// frequency to flush the buffer
interval time.Duration

// logger
logger schematicgo.Logger

// max number of events to store in buffer
maxEvents int

// mutexes for flushing and pushing to the buffer
mutexFlush sync.Mutex
mutexPush sync.Mutex

// channel to signal shutdown
shutdown chan struct{}

// whether to accept new events
stopped bool
}

func NewEventBuffer(
client *events.Client,
errors chan error,
logger schematicgo.Logger,
_period *time.Duration,
) *eventBuffer {
period := defaultEventBufferPeriod
if _period != nil {
period = *_period
}

buffer := &eventBuffer{
events: []*schematicgo.CreateEventRequestBody{},
eventClient: client,
errors: errors,
interval: period,
logger: logger,
maxEvents: maxEvents,
mutexFlush: sync.Mutex{},
mutexPush: sync.Mutex{},
shutdown: make(chan struct{}),
}

// Start ticker to flush events periodically
go buffer.periodicFlush()

return buffer
}

func (b *eventBuffer) flush() {
b.mutexFlush.Lock()
defer b.mutexFlush.Unlock()

if len(b.events) == 0 {
return
}

events := make([]*schematicgo.CreateEventRequestBody, len(b.events))
for i, event := range b.events {
if event != nil {
events[i] = event
}
}

req := &schematicgo.CreateEventBatchRequestBody{
Events: events,
}

if _, err := b.eventClient.CreateEventBatch(context.Background(), req); err != nil {
b.errors <- err
}

b.events = b.events[:0]
}

func (b *eventBuffer) periodicFlush() {
ticker := time.NewTicker(b.interval)
defer ticker.Stop()

for {
select {
case <-b.shutdown:
// stop accepting new events
b.stopped = true

// flush any remaining events
b.flush()

return
case <-ticker.C:
b.flush()
}
}
}

func (b *eventBuffer) Push(event *schematicgo.CreateEventRequestBody) {
if event == nil {
return
}

if b.stopped {
b.logger.Printf("ERROR: Event buffer is stopped, not accepting new events")
return
}

b.mutexPush.Lock()
defer b.mutexPush.Unlock()

if len(b.events) >= b.maxEvents {
b.flush()
}

b.events = append(b.events, event)
}

func (b *eventBuffer) Stop() {
defer func() {
if r := recover(); r != nil {
b.logger.Printf("ERROR: Panic occurred while closing client %v", r)
}
}()

close(b.shutdown)
}
114 changes: 114 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cache

import (
"container/list"
"context"
"sync"
"time"
)

const defaultCacheSize = 1000 // 1000 records
const defaultCacheTTL = 5 * time.Second

type cachedItem[T any] struct {
key string
value T
expiration time.Time
}

type localCache[T any] struct {
cache map[string]*list.Element
lruList *list.List
maxSize int
ttl time.Duration
mu sync.RWMutex
}

func NewDefaultCache[T any]() *localCache[T] {
return NewLocalCache[T](defaultCacheSize, defaultCacheTTL)
}

func NewLocalCache[T any](maxSize int, ttl time.Duration) *localCache[T] {
return &localCache[T]{
cache: make(map[string]*list.Element),
lruList: list.New(),
maxSize: maxSize,
ttl: ttl,
}
}

func (c *localCache[T]) Get(ctx context.Context, key string) (T, bool) {
var empty T
if c == nil || c.maxSize == 0 {
return empty, false
}

c.mu.RLock()
element, exists := c.cache[key]
c.mu.RUnlock()

if !exists {
return empty, false
}

item := element.Value.(*cachedItem[T])

// Check if the item has expired
if time.Now().After(item.expiration) {
c.mu.Lock()
c.lruList.Remove(element)
delete(c.cache, key)
c.mu.Unlock()
return empty, false
}

// Move the accessed item to the front of the list
c.mu.Lock()
c.lruList.MoveToFront(element)
c.mu.Unlock()

return item.value, true
}

func (c *localCache[T]) Set(ctx context.Context, key string, val T, ttlOverride *time.Duration) error {
if c == nil || c.maxSize == 0 {
return nil
}

ttl := c.ttl
if ttlOverride != nil {
ttl = *ttlOverride
}

c.mu.Lock()
defer c.mu.Unlock()

// If the key already exists, update it
if element, exists := c.cache[key]; exists {
c.lruList.MoveToFront(element)
item := element.Value.(*cachedItem[T])
item.value = val
item.expiration = time.Now().Add(ttl)
return nil
}

// If we're at capacity, remove the least recently used item
if c.lruList.Len() >= c.maxSize {
oldest := c.lruList.Back()
if oldest != nil {
c.lruList.Remove(oldest)
delete(c.cache, oldest.Value.(*cachedItem[T]).key)
}
}

// Add the new item
item := &cachedItem[T]{
key: key,
value: val,
expiration: time.Now().Add(ttl),
}
element := c.lruList.PushFront(item)
c.cache[key] = element

return nil
}
Loading

0 comments on commit 8e8f8e0

Please sign in to comment.