Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download video to sync to local cache #113

Open
wants to merge 7 commits into
base: local
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 227 additions & 3 deletions local/local.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,246 @@
package local

import (
"fmt"
"errors"
"os"
"regexp"
"strings"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/abadojack/whatlanggo"

"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/tags_manager"
)

type SyncContext struct {
DryRun bool
KeepCache bool
TempDir string
LbrynetAddr string
ChannelID string
PublishBid float64
YouTubeSourceConfig *YouTubeSourceConfig
}

func (c *SyncContext) Validate() error {
if c.TempDir == "" {
return errors.New("No TempDir provided")
}
if c.LbrynetAddr == "" {
return errors.New("No Lbrynet address provided")
}
if c.ChannelID == "" {
return errors.New("No channel ID provided")
}
if c.PublishBid <= 0.0 {
return errors.New("Publish bid is not greater than zero")
}
return nil
}

type YouTubeSourceConfig struct {
YouTubeAPIKey string
}

var syncContext SyncContext

func AddCommand(rootCmd *cobra.Command) {
cmd := &cobra.Command{
Use: "local",
Short: "run a personal ytsync",
Run: localCmd,
Args: cobra.ExactArgs(1),
}
//cmd.Flags().StringVar(&cache, "cache", "", "path to cache")
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")

// For now, assume source is always YouTube
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
rootCmd.AddCommand(cmd)
}

func getEnvDefault(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}

func localCmd(cmd *cobra.Command, args []string) {
fmt.Println("local")
err := syncContext.Validate()
if err != nil {
log.Error(err)
return
}
videoID := args[0]

log.Debugf("Running sync for video ID %s", videoID)

var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil {
log.Errorf("Error setting up publisher: %v", err)
return
}

var videoSource VideoSource
if syncContext.YouTubeSourceConfig != nil {
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig)
if err != nil {
log.Errorf("Error setting up video source: %v", err)
return
}
}

sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil {
log.Errorf("Error getting source video: %v", err)
return
}

processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil {
log.Errorf("Error processing source video for publishing: %v", err)
return
}

if syncContext.DryRun {
log.Infoln("This is a dry run. Nothing will be published.")
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
log.Debugf("Object to be published: %v", processedVideo)

} else {
done, err := publisher.Publish(*processedVideo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id call this something other than done to make it clear its a channel that lets you know when the video is done reflecting

if err != nil {
log.Errorf("Error publishing video: %v", err)
return
}

err = <-done
if err != nil {
log.Errorf("Error while wating for stream to reflect: %v", err)
}
}

if !syncContext.KeepCache {
log.Infof("Deleting local files.")
err = videoSource.DeleteLocalCache(videoID)
if err != nil {
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
}
}
log.Info("Done")
}

type SourceVideo struct {
ID string
Title *string
Description *string
SourceURL string
Languages []string
Tags []string
ReleaseTime *int64
ThumbnailURL *string
FullLocalPath string
}

type PublishableVideo struct {
ID string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}

func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
return nil, err
}

descriptionSample := ""
if source.Description != nil {
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
}
info := whatlanggo.Detect(descriptionSample)

title := ""
if source.Title != nil {
title = *source.Title
}
info2 := whatlanggo.Detect(title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
}

claimName := namer.NewNamer().GetNextName(title)

thumbnailURL := source.ThumbnailURL
if thumbnailURL == nil {
thumbnailURL = util.PtrToString("")
}

releaseTime := source.ReleaseTime
if releaseTime == nil {
releaseTime = util.PtrToInt64(time.Now().Unix())
}

processed := PublishableVideo {
ClaimName: claimName,
Title: title,
Description: getAbbrevDescription(source),
Languages: languages,
Tags: tags,
ReleaseTime: *releaseTime,
ThumbnailURL: *thumbnailURL,
FullLocalPath: source.FullLocalPath,
}

log.Debugf("Video prepared for publication: %v", processed)

return &processed, nil
}

func getAbbrevDescription(v SourceVideo) string {
if v.Description == nil {
return v.SourceURL
}

maxLength := 2800
description := strings.TrimSpace(*v.Description)
additionalDescription := "\n" + v.SourceURL
if len(description) > maxLength {
description = description[:maxLength]
}
return description + "\n..." + additionalDescription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit, but doesn't this potentially make the description longer than the max length?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was mostly ripped from getAbbrevDescription in sources/youtubeVideo.go. I didn't know exactly where the value of 2800 came from, so I just assumed the logic was correct. If it is supposed to be a limit on the total description length, I'll fix this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that's an estimate of how long the description can get before we hit the max claim size limit. id still fix it so the math is right though :-)

}

type VideoSource interface {
GetVideo(id string) (*SourceVideo, error)
DeleteLocalCache(id string) error
}

type VideoPublisher interface {
Publish(video PublishableVideo) (chan error, error)
}
120 changes: 120 additions & 0 deletions local/localSDKPublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package local

import (
"errors"
"sort"
"time"

log "github.com/sirupsen/logrus"

"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
)

type LocalSDKPublisher struct {
channelID string
publishBid float64
lbrynet *jsonrpc.Client
}

func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
lbrynet := jsonrpc.NewClient(sdkAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)

status, err := lbrynet.Status()
if err != nil {
return nil, err
}

if !status.IsRunning {
return nil, errors.New("SDK is not running")
}

// Should check to see if the SDK owns the channel

// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now

// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now

publisher := LocalSDKPublisher {
channelID: channelID,
publishBid: publishBid,
lbrynet: lbrynet,
}
return &publisher, nil
}

func (p *LocalSDKPublisher) Publish(video PublishableVideo) (chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title,
Description: &video.Description,
Languages: video.Languages,
ThumbnailURL: &video.ThumbnailURL,
Tags: video.Tags,
},
ReleaseTime: &video.ReleaseTime,
ChannelID: &p.channelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
}

txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
if err != nil {
return nil, err
}

done := make(chan error, 1)
go func() {
for {
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
done <- err
return
}
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
done <- err
return
}

fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
break
}
if !fileStatus.UploadingToReflector {
log.Warn("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is an error (or an unexpected thing), it should prolly send an error to done, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thinking was that this may be intentional since it is driven by lbrynet settings. I could turn this into an error if that case is unlikely. I could also add a command line flag to indicate that streams won't be reflected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea id either make it an option or just assume that all streams will be reflected. option is better but more work

break
}
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
}
done <- nil
}()

return done, nil
}

// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
}
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
}
if response.Page >= response.TotalPages {
return nil, 0, nil
}
response, err = client.FileList(response.Page + 1, 20)
}
}
Loading