Skip to content

Commit

Permalink
Merge pull request #12 from bcambl/stream_info
Browse files Browse the repository at this point in the history
Stream info
  • Loading branch information
bcambl authored Feb 21, 2021
2 parents e7d4795 + 550bde7 commit 0e4ed7c
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 116 deletions.
26 changes: 14 additions & 12 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import (
bolt "go.etcd.io/bbolt"
)

// DataBuckets is a slice of all buckets that exist throught the project
var DataBuckets = []string{
"ConfigBucket", // General configuration & caching
"PublisherBucket", // Local publishers -> rtmp stream keys
"RTMPLiveBucket", // Local publishers -> rtmp live stream status
"TwitchStreamBucket", // Local publishers -> twitch stream names
"TwitchLiveBucket", // Local publishers -> twitch live stream status
"TwitchNotificationBucket", // Local publishers -> twitch notification state
"StreamInfoBucket", // Local publishers -> generic stream information
}

func init() {
debugFlag := flag.Bool("debug", false, "enable debug logging")
envVarsFlag := flag.Bool("environment", false, "print environment variables with defaults")
Expand Down Expand Up @@ -47,19 +58,10 @@ func init() {
}
defer db.Close()

bucketList := []string{
"ConfigBucket", // General configuration & caching
"PublisherBucket", // Local publishers -> rtmp stream keys
"RTMPLiveBucket", // Local publishers -> rtmp live stream status
"TwitchStreamBucket", // Local publishers -> twitch stream names
"TwitchLiveBucket", // Local publishers -> twitch live stream status
"TwitchNotificationBucket", // Local publishers -> twitch notification state
}

for b := range bucketList {
log.Debug("db: ensuring bucket exists: ", bucketList[b])
for b := range DataBuckets {
log.Debug("db: ensuring bucket exists: ", DataBuckets[b])
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(bucketList[b]))
_, err := tx.CreateBucketIfNotExists([]byte(DataBuckets[b]))
if err != nil {
return fmt.Errorf("error creating bucket: %s", err)
}
Expand Down
19 changes: 19 additions & 0 deletions controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,22 @@ func (c *Controller) IndexHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"handler": "index"}`))

}

func (c *Controller) setBucketValue(bucket, key, value string) error {
c.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucket))
err := b.Put([]byte(key), []byte(value))
return err
})
return nil
}

func (c *Controller) getBucketValue(bucket, key string) ([]byte, error) {
var result []byte
err := c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucket))
result = b.Get([]byte(key))
return nil
})
return result, err
}
134 changes: 51 additions & 83 deletions controllers/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Publisher struct {
TwitchStream string `json:"twitch_stream"`
TwitchLive string `json:"twitch_live"`
TwitchNotification string `json:"-"`
StreamInfo string `json:"-"`
}

// IsValid perform basic validations on a publisher record
Expand All @@ -33,15 +34,6 @@ func (p *Publisher) IsValid() error {
return nil
}

func (c *Controller) setLocalLive(p *Publisher, status string) error {
c.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("RTMPLiveBucket"))
err := b.Put([]byte(p.Name), []byte(status))
return err
})
return nil
}

// IsTwitchLive returns a boolean based on string value of TwitchLive field
func (p *Publisher) IsTwitchLive() bool {
if p.TwitchLive != "" {
Expand All @@ -50,26 +42,41 @@ func (p *Publisher) IsTwitchLive() bool {
return false
}

func (c *Controller) setTwitchLive(p *Publisher, status string) error {
c.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchLiveBucket"))
err := b.Put([]byte(p.Name), []byte(status))
// FetchPublisher populates the publisher struct from the database
func (c *Controller) FetchPublisher(p *Publisher) error {
var b []byte
var err error
b, err = c.getBucketValue("RTMPLiveBucket", p.Name)
if err != nil {
return err
})
return nil
}

func (c *Controller) setTwitchNotification(p *Publisher, notification string) error {
c.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchNotificationBucket"))
err := b.Put([]byte(p.Name), []byte(notification))
}
p.RTMPLive = string(b)
b, err = c.getBucketValue("TwitchStreamBucket", p.Name)
if err != nil {
return err
})
}
p.TwitchStream = string(b)
b, err = c.getBucketValue("TwitchLiveBucket", p.Name)
if err != nil {
return err
}
p.TwitchLive = string(b)
b, err = c.getBucketValue("TwitchNotificationBucket", p.Name)
if err != nil {
return err
}
p.TwitchNotification = string(b)
b, err = c.getBucketValue("SteamInfoBucket", p.Name)
if err != nil {
return err
}
p.StreamInfo = string(b)

return nil
}

func (c *Controller) getAllPublisher() ([]Publisher, error) {
var stream, rtmpLive, twitchLive, notification []byte
var err error
publishers := []Publisher{}
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("PublisherBucket"))
Expand All @@ -85,74 +92,35 @@ func (c *Controller) getAllPublisher() ([]Publisher, error) {

for i := range publishers {
p := &publishers[i]
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("RTMPLiveBucket"))
rtmpLive = b.Get([]byte(p.Name))
return nil
})
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchStreamBucket"))
stream = b.Get([]byte(p.Name))
return nil
})
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchLiveBucket"))
twitchLive = b.Get([]byte(p.Name))
return nil
})
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchNotificationBucket"))
notification = b.Get([]byte(p.Name))
return nil
})
p.RTMPLive = string(rtmpLive)
p.TwitchStream = string(stream)
p.TwitchLive = string(twitchLive)
p.TwitchNotification = string(notification)
err = c.FetchPublisher(p)
if err != nil {
return nil, err
}
}

return publishers, nil
}

func (c *Controller) getPublisher(name string) (Publisher, error) {
var key, stream, rtmpLive, twitchLive, notification []byte
var keyBytes []byte
var err error

p := Publisher{}
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("PublisherBucket"))
key = b.Get([]byte(name))
return nil
})
if len(key) < 1 {
return p, errors.New("publisher not found")

keyBytes, err = c.getBucketValue("PublisherBucket", name)
if err != nil {
return p, err
}
p.Key = string(keyBytes)

c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("RTMPLiveBucket"))
rtmpLive = b.Get([]byte(p.Name))
return nil
})
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchStreamBucket"))
stream = b.Get([]byte(name))
return nil
})
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchLiveBucket"))
twitchLive = b.Get([]byte(name))
return nil
})
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("TwitchNotificationBucket"))
notification = b.Get([]byte(name))
return nil
})
if len(p.Key) < 1 {
return p, errors.New("publisher not found")
}

p.Name = name
p.Key = string(key)
p.RTMPLive = string(rtmpLive)
p.TwitchLive = string(twitchLive)
p.TwitchStream = string(stream)
p.TwitchNotification = string(notification)
err = c.FetchPublisher(&p)
if err != nil {
return p, err
}

return p, nil
}
Expand Down Expand Up @@ -231,7 +199,7 @@ func (c *Controller) OnPublishHandler(w http.ResponseWriter, r *http.Request) {
serverFQDN := c.Config.RTMPServerFQDN
serverPort := c.Config.RTMPServerPort

err = c.setLocalLive(&p, "live")
err = c.setBucketValue("RTMPLiveBucket", p.Name, "live")
if err != nil {
log.Error("error enabling local live status")
}
Expand Down Expand Up @@ -265,7 +233,7 @@ func (c *Controller) OnPublishDoneHandler(w http.ResponseWriter, r *http.Request
}
log.Printf("on_publish_done authorized: %s", p.Name)

err = c.setLocalLive(&p, "")
err = c.setBucketValue("RTMPLiveBucket", p.Name, "")
if err != nil {
log.Error("error disabling local live status")
}
Expand Down
50 changes: 29 additions & 21 deletions controllers/twitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/oauth2/clientcredentials"
"golang.org/x/oauth2/twitch"
)
Expand Down Expand Up @@ -54,11 +53,11 @@ type GameData struct {
// Config struct. This is only called when the token is not set in Config
func (c *Controller) getCachedAccessToken() (string, error) {
var tokenBytes []byte
c.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("ConfigBucket"))
tokenBytes = b.Get([]byte("twitchAccessToken"))
return nil
})
var err error
tokenBytes, err = c.getBucketValue("ConfigBucket", "twitchAccessToken")
if err != nil {
return "", err
}
if len(tokenBytes) < 1 {
return "", errors.New("cached twitch access token not found in db")
}
Expand All @@ -71,11 +70,10 @@ func (c *Controller) updateCachedAccessToken(accessToken string) error {
if accessToken == "" {
return errors.New("updateCachedAccessToken: no token provided")
}
c.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("ConfigBucket"))
err = b.Put([]byte("twitchAccessToken"), []byte(accessToken))
err = c.setBucketValue("ConfigBucket", "twitchAccessToken", accessToken)
if err != nil {
return err
})
}
return nil
}

Expand Down Expand Up @@ -309,7 +307,7 @@ func (c *Controller) updateLiveStatus(streams []StreamData) error {
return err
}

// mark previous live streams -> offline
// mark previous live streams -> offline and notify of stream info change
for i := range publishers {
live = false
p := &publishers[i]
Expand All @@ -318,12 +316,25 @@ func (c *Controller) updateLiveStatus(streams []StreamData) error {
s := streams[x]
if strings.ToLower(s.UserName) == strings.ToLower(p.TwitchStream) {
live = true
// retieve stream info
g, err := c.getGame(s.GameID)
if err != nil {
return err
}
streamInfo := fmt.Sprintf("title: %s\ngame: %s", s.Title, g.Name)
if p.StreamInfo != "" && p.StreamInfo != streamInfo {
// streamer changed their stream info, set notification
notification := fmt.Sprintf("%s switched it up!\n%s", p.Name, p.StreamInfo)
c.setBucketValue("TwitchNotificationBucket", p.Name, notification)
}
p.StreamInfo = streamInfo
}
}
if !live {
c.setTwitchLive(p, "")
c.setBucketValue("TwitchLiveBucket", p.Name, "")
c.setBucketValue("StreamInfoBucket", p.Name, "")
notification := fmt.Sprintf(":checkered_flag: %s finished streaming on twitch", p.Name)
c.setTwitchNotification(p, notification)
c.setBucketValue("TwitchNotificationBucket", p.Name, notification)
}
}
}
Expand All @@ -338,14 +349,11 @@ func (c *Controller) updateLiveStatus(streams []StreamData) error {
}
if strings.ToLower(s.UserName) == strings.ToLower(p.TwitchStream) {
if !p.IsTwitchLive() {
c.setTwitchLive(p, s.Type)
c.setBucketValue("TwitchLiveBucket", p.Name, s.Type)
streamLink := fmt.Sprintf("https://twitch.tv/%s", p.TwitchStream)
g, err := c.getGame(s.GameID)
if err != nil {
return err
}
notification := fmt.Sprintf(":movie_camera: %s started a public stream on twitch!\ntitle: %s\ngame: %s\nwatch now: `%s`", p.Name, s.Title, g.Name, streamLink)
c.setTwitchNotification(p, notification)
notification := fmt.Sprintf(":movie_camera: %s started streaming on twitch!"+
"\n%s\nwatch now: `%s`", p.Name, p.StreamInfo, streamLink)
c.setBucketValue("TwitchNotificationBucket", p.Name, notification)
}
}
}
Expand Down Expand Up @@ -379,7 +387,7 @@ func (c *Controller) processNotifications() error {
}
if p.TwitchNotification != "" {
log.Debugf("resetting notification for %s (%s)", p.Name, p.TwitchStream)
err = c.setTwitchNotification(&p, "")
err = c.setBucketValue("TwitchNotificationBucket", p.Name, "")
if err != nil {
return err
}
Expand Down

0 comments on commit 0e4ed7c

Please sign in to comment.