diff --git a/app/app.go b/app/app.go index bb6a60c..a5cd1d7 100644 --- a/app/app.go +++ b/app/app.go @@ -13,6 +13,17 @@ import ( bolt "go.etcd.io/bbolt" ) +// DataBuckets is a slice of all buckets that exist throught the project +var DataBuckets = []string{ + "ConfigBucket", // General configuration & caching + "PublisherBucket", // Local publishers -> rtmp stream keys + "RTMPLiveBucket", // Local publishers -> rtmp live stream status + "TwitchStreamBucket", // Local publishers -> twitch stream names + "TwitchLiveBucket", // Local publishers -> twitch live stream status + "TwitchNotificationBucket", // Local publishers -> twitch notification state + "StreamInfoBucket", // Local publishers -> generic stream information +} + func init() { debugFlag := flag.Bool("debug", false, "enable debug logging") envVarsFlag := flag.Bool("environment", false, "print environment variables with defaults") @@ -47,19 +58,10 @@ func init() { } defer db.Close() - bucketList := []string{ - "ConfigBucket", // General configuration & caching - "PublisherBucket", // Local publishers -> rtmp stream keys - "RTMPLiveBucket", // Local publishers -> rtmp live stream status - "TwitchStreamBucket", // Local publishers -> twitch stream names - "TwitchLiveBucket", // Local publishers -> twitch live stream status - "TwitchNotificationBucket", // Local publishers -> twitch notification state - } - - for b := range bucketList { - log.Debug("db: ensuring bucket exists: ", bucketList[b]) + for b := range DataBuckets { + log.Debug("db: ensuring bucket exists: ", DataBuckets[b]) db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte(bucketList[b])) + _, err := tx.CreateBucketIfNotExists([]byte(DataBuckets[b])) if err != nil { return fmt.Errorf("error creating bucket: %s", err) } diff --git a/controllers/main.go b/controllers/main.go index 7cd9ea7..f717465 100644 --- a/controllers/main.go +++ b/controllers/main.go @@ -19,3 +19,22 @@ func (c *Controller) IndexHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`{"handler": "index"}`)) } + +func (c *Controller) setBucketValue(bucket, key, value string) error { + c.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + err := b.Put([]byte(key), []byte(value)) + return err + }) + return nil +} + +func (c *Controller) getBucketValue(bucket, key string) ([]byte, error) { + var result []byte + err := c.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + result = b.Get([]byte(key)) + return nil + }) + return result, err +} diff --git a/controllers/publish.go b/controllers/publish.go index d69f204..7d0e679 100644 --- a/controllers/publish.go +++ b/controllers/publish.go @@ -17,6 +17,7 @@ type Publisher struct { TwitchStream string `json:"twitch_stream"` TwitchLive string `json:"twitch_live"` TwitchNotification string `json:"-"` + StreamInfo string `json:"-"` } // IsValid perform basic validations on a publisher record @@ -33,15 +34,6 @@ func (p *Publisher) IsValid() error { return nil } -func (c *Controller) setLocalLive(p *Publisher, status string) error { - c.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("RTMPLiveBucket")) - err := b.Put([]byte(p.Name), []byte(status)) - return err - }) - return nil -} - // IsTwitchLive returns a boolean based on string value of TwitchLive field func (p *Publisher) IsTwitchLive() bool { if p.TwitchLive != "" { @@ -50,26 +42,41 @@ func (p *Publisher) IsTwitchLive() bool { return false } -func (c *Controller) setTwitchLive(p *Publisher, status string) error { - c.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchLiveBucket")) - err := b.Put([]byte(p.Name), []byte(status)) +// FetchPublisher populates the publisher struct from the database +func (c *Controller) FetchPublisher(p *Publisher) error { + var b []byte + var err error + b, err = c.getBucketValue("RTMPLiveBucket", p.Name) + if err != nil { return err - }) - return nil -} - -func (c *Controller) setTwitchNotification(p *Publisher, notification string) error { - c.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchNotificationBucket")) - err := b.Put([]byte(p.Name), []byte(notification)) + } + p.RTMPLive = string(b) + b, err = c.getBucketValue("TwitchStreamBucket", p.Name) + if err != nil { return err - }) + } + p.TwitchStream = string(b) + b, err = c.getBucketValue("TwitchLiveBucket", p.Name) + if err != nil { + return err + } + p.TwitchLive = string(b) + b, err = c.getBucketValue("TwitchNotificationBucket", p.Name) + if err != nil { + return err + } + p.TwitchNotification = string(b) + b, err = c.getBucketValue("SteamInfoBucket", p.Name) + if err != nil { + return err + } + p.StreamInfo = string(b) + return nil } func (c *Controller) getAllPublisher() ([]Publisher, error) { - var stream, rtmpLive, twitchLive, notification []byte + var err error publishers := []Publisher{} c.DB.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte("PublisherBucket")) @@ -85,74 +92,35 @@ func (c *Controller) getAllPublisher() ([]Publisher, error) { for i := range publishers { p := &publishers[i] - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("RTMPLiveBucket")) - rtmpLive = b.Get([]byte(p.Name)) - return nil - }) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchStreamBucket")) - stream = b.Get([]byte(p.Name)) - return nil - }) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchLiveBucket")) - twitchLive = b.Get([]byte(p.Name)) - return nil - }) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchNotificationBucket")) - notification = b.Get([]byte(p.Name)) - return nil - }) - p.RTMPLive = string(rtmpLive) - p.TwitchStream = string(stream) - p.TwitchLive = string(twitchLive) - p.TwitchNotification = string(notification) + err = c.FetchPublisher(p) + if err != nil { + return nil, err + } } return publishers, nil } func (c *Controller) getPublisher(name string) (Publisher, error) { - var key, stream, rtmpLive, twitchLive, notification []byte + var keyBytes []byte + var err error + p := Publisher{} - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("PublisherBucket")) - key = b.Get([]byte(name)) - return nil - }) - if len(key) < 1 { - return p, errors.New("publisher not found") + + keyBytes, err = c.getBucketValue("PublisherBucket", name) + if err != nil { + return p, err } + p.Key = string(keyBytes) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("RTMPLiveBucket")) - rtmpLive = b.Get([]byte(p.Name)) - return nil - }) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchStreamBucket")) - stream = b.Get([]byte(name)) - return nil - }) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchLiveBucket")) - twitchLive = b.Get([]byte(name)) - return nil - }) - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("TwitchNotificationBucket")) - notification = b.Get([]byte(name)) - return nil - }) + if len(p.Key) < 1 { + return p, errors.New("publisher not found") + } - p.Name = name - p.Key = string(key) - p.RTMPLive = string(rtmpLive) - p.TwitchLive = string(twitchLive) - p.TwitchStream = string(stream) - p.TwitchNotification = string(notification) + err = c.FetchPublisher(&p) + if err != nil { + return p, err + } return p, nil } @@ -231,7 +199,7 @@ func (c *Controller) OnPublishHandler(w http.ResponseWriter, r *http.Request) { serverFQDN := c.Config.RTMPServerFQDN serverPort := c.Config.RTMPServerPort - err = c.setLocalLive(&p, "live") + err = c.setBucketValue("RTMPLiveBucket", p.Name, "live") if err != nil { log.Error("error enabling local live status") } @@ -265,7 +233,7 @@ func (c *Controller) OnPublishDoneHandler(w http.ResponseWriter, r *http.Request } log.Printf("on_publish_done authorized: %s", p.Name) - err = c.setLocalLive(&p, "") + err = c.setBucketValue("RTMPLiveBucket", p.Name, "") if err != nil { log.Error("error disabling local live status") } diff --git a/controllers/twitch.go b/controllers/twitch.go index b1e8700..ac80ca7 100644 --- a/controllers/twitch.go +++ b/controllers/twitch.go @@ -11,7 +11,6 @@ import ( "time" log "github.com/sirupsen/logrus" - bolt "go.etcd.io/bbolt" "golang.org/x/oauth2/clientcredentials" "golang.org/x/oauth2/twitch" ) @@ -54,11 +53,11 @@ type GameData struct { // Config struct. This is only called when the token is not set in Config func (c *Controller) getCachedAccessToken() (string, error) { var tokenBytes []byte - c.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("ConfigBucket")) - tokenBytes = b.Get([]byte("twitchAccessToken")) - return nil - }) + var err error + tokenBytes, err = c.getBucketValue("ConfigBucket", "twitchAccessToken") + if err != nil { + return "", err + } if len(tokenBytes) < 1 { return "", errors.New("cached twitch access token not found in db") } @@ -71,11 +70,10 @@ func (c *Controller) updateCachedAccessToken(accessToken string) error { if accessToken == "" { return errors.New("updateCachedAccessToken: no token provided") } - c.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("ConfigBucket")) - err = b.Put([]byte("twitchAccessToken"), []byte(accessToken)) + err = c.setBucketValue("ConfigBucket", "twitchAccessToken", accessToken) + if err != nil { return err - }) + } return nil } @@ -309,7 +307,7 @@ func (c *Controller) updateLiveStatus(streams []StreamData) error { return err } - // mark previous live streams -> offline + // mark previous live streams -> offline and notify of stream info change for i := range publishers { live = false p := &publishers[i] @@ -318,12 +316,25 @@ func (c *Controller) updateLiveStatus(streams []StreamData) error { s := streams[x] if strings.ToLower(s.UserName) == strings.ToLower(p.TwitchStream) { live = true + // retieve stream info + g, err := c.getGame(s.GameID) + if err != nil { + return err + } + streamInfo := fmt.Sprintf("title: %s\ngame: %s", s.Title, g.Name) + if p.StreamInfo != "" && p.StreamInfo != streamInfo { + // streamer changed their stream info, set notification + notification := fmt.Sprintf("%s switched it up!\n%s", p.Name, p.StreamInfo) + c.setBucketValue("TwitchNotificationBucket", p.Name, notification) + } + p.StreamInfo = streamInfo } } if !live { - c.setTwitchLive(p, "") + c.setBucketValue("TwitchLiveBucket", p.Name, "") + c.setBucketValue("StreamInfoBucket", p.Name, "") notification := fmt.Sprintf(":checkered_flag: %s finished streaming on twitch", p.Name) - c.setTwitchNotification(p, notification) + c.setBucketValue("TwitchNotificationBucket", p.Name, notification) } } } @@ -338,14 +349,11 @@ func (c *Controller) updateLiveStatus(streams []StreamData) error { } if strings.ToLower(s.UserName) == strings.ToLower(p.TwitchStream) { if !p.IsTwitchLive() { - c.setTwitchLive(p, s.Type) + c.setBucketValue("TwitchLiveBucket", p.Name, s.Type) streamLink := fmt.Sprintf("https://twitch.tv/%s", p.TwitchStream) - g, err := c.getGame(s.GameID) - if err != nil { - return err - } - notification := fmt.Sprintf(":movie_camera: %s started a public stream on twitch!\ntitle: %s\ngame: %s\nwatch now: `%s`", p.Name, s.Title, g.Name, streamLink) - c.setTwitchNotification(p, notification) + notification := fmt.Sprintf(":movie_camera: %s started streaming on twitch!"+ + "\n%s\nwatch now: `%s`", p.Name, p.StreamInfo, streamLink) + c.setBucketValue("TwitchNotificationBucket", p.Name, notification) } } } @@ -379,7 +387,7 @@ func (c *Controller) processNotifications() error { } if p.TwitchNotification != "" { log.Debugf("resetting notification for %s (%s)", p.Name, p.TwitchStream) - err = c.setTwitchNotification(&p, "") + err = c.setBucketValue("TwitchNotificationBucket", p.Name, "") if err != nil { return err }