Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download video to sync to local cache #113

Open
wants to merge 7 commits into
base: local
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 234 additions & 3 deletions local/local.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,253 @@
package local

import (
"fmt"
"errors"
"os"
"regexp"
"strings"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/abadojack/whatlanggo"

"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/tags_manager"
)

type SyncContext struct {
DryRun bool
KeepCache bool
ReflectStreams bool
TempDir string
LbrynetAddr string
ChannelID string
PublishBid float64
YouTubeSourceConfig *YouTubeSourceConfig
}

func (c *SyncContext) Validate() error {
if c.TempDir == "" {
return errors.New("No TempDir provided")
}
if c.LbrynetAddr == "" {
return errors.New("No Lbrynet address provided")
}
if c.ChannelID == "" {
return errors.New("No channel ID provided")
}
if c.PublishBid <= 0.0 {
return errors.New("Publish bid is not greater than zero")
}
return nil
}

type YouTubeSourceConfig struct {
YouTubeAPIKey string
}

var syncContext SyncContext

func AddCommand(rootCmd *cobra.Command) {
cmd := &cobra.Command{
Use: "local",
Short: "run a personal ytsync",
Run: localCmd,
Args: cobra.ExactArgs(1),
}
//cmd.Flags().StringVar(&cache, "cache", "", "path to cache")
cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream")
cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.")
cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.")
cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files")
cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim")
cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon")
cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to")

// For now, assume source is always YouTube
syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{}
cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key")
rootCmd.AddCommand(cmd)
}

func getEnvDefault(key, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}

func localCmd(cmd *cobra.Command, args []string) {
fmt.Println("local")
err := syncContext.Validate()
if err != nil {
log.Error(err)
return
}
videoID := args[0]

log.Debugf("Running sync for video ID %s", videoID)

var publisher VideoPublisher
publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid)
if err != nil {
log.Errorf("Error setting up publisher: %v", err)
return
}

var videoSource VideoSource
if syncContext.YouTubeSourceConfig != nil {
videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig)
if err != nil {
log.Errorf("Error setting up video source: %v", err)
return
}
}

sourceVideo, err := videoSource.GetVideo(videoID)
if err != nil {
log.Errorf("Error getting source video: %v", err)
return
}

processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID)
if err != nil {
log.Errorf("Error processing source video for publishing: %v", err)
return
}

if syncContext.DryRun {
log.Infoln("This is a dry run. Nothing will be published.")
log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName)
log.Debugf("Object to be published: %v", processedVideo)

} else {
doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams)
if err != nil {
log.Errorf("Error publishing video: %v", err)
return
}

if syncContext.ReflectStreams {
err = <-doneReflectingCh
if err != nil {
log.Errorf("Error while wating for stream to reflect: %v", err)
}
} else {
log.Debugln("Not waiting for stream to reflect.")
}
}

if !syncContext.KeepCache {
log.Infof("Deleting local files.")
err = videoSource.DeleteLocalCache(videoID)
if err != nil {
log.Errorf("Error deleting local files for video %s: %v", videoID, err)
}
}
log.Info("Done")
}

type SourceVideo struct {
ID string
Title *string
Description *string
SourceURL string
Languages []string
Tags []string
ReleaseTime *int64
ThumbnailURL *string
FullLocalPath string
}

type PublishableVideo struct {
ID string
ClaimName string
Title string
Description string
SourceURL string
Languages []string
Tags []string
ReleaseTime int64
ThumbnailURL string
FullLocalPath string
}

func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) {
tags, err := tags_manager.SanitizeTags(source.Tags, channelID)
if err != nil {
log.Errorf("Error sanitizing tags: %v", err)
return nil, err
}

descriptionSample := ""
if source.Description != nil {
urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`)
descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "")
}
info := whatlanggo.Detect(descriptionSample)

title := ""
if source.Title != nil {
title = *source.Title
}
info2 := whatlanggo.Detect(title)
var languages []string = nil
if info.IsReliable() && info.Lang.Iso6391() != "" {
language := info.Lang.Iso6391()
languages = []string{language}
} else if info2.IsReliable() && info2.Lang.Iso6391() != "" {
language := info2.Lang.Iso6391()
languages = []string{language}
}

claimName := namer.NewNamer().GetNextName(title)

thumbnailURL := source.ThumbnailURL
if thumbnailURL == nil {
thumbnailURL = util.PtrToString("")
}

releaseTime := source.ReleaseTime
if releaseTime == nil {
releaseTime = util.PtrToInt64(time.Now().Unix())
}

processed := PublishableVideo {
ClaimName: claimName,
Title: title,
Description: getAbbrevDescription(source),
Languages: languages,
Tags: tags,
ReleaseTime: *releaseTime,
ThumbnailURL: *thumbnailURL,
FullLocalPath: source.FullLocalPath,
}

log.Debugf("Video prepared for publication: %v", processed)

return &processed, nil
}

func getAbbrevDescription(v SourceVideo) string {
if v.Description == nil {
return v.SourceURL
}

additionalDescription := "\n...\n" + v.SourceURL
maxLength := 2800 - len(additionalDescription)

description := strings.TrimSpace(*v.Description)
if len(description) > maxLength {
description = description[:maxLength]
}
return description + additionalDescription
}

type VideoSource interface {
GetVideo(id string) (*SourceVideo, error)
DeleteLocalCache(id string) error
}

type VideoPublisher interface {
Publish(video PublishableVideo, reflectStream bool) (chan error, error)
}
125 changes: 125 additions & 0 deletions local/localSDKPublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package local

import (
"errors"
"sort"
"time"

log "github.com/sirupsen/logrus"

"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
)

type LocalSDKPublisher struct {
channelID string
publishBid float64
lbrynet *jsonrpc.Client
}

func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) {
lbrynet := jsonrpc.NewClient(sdkAddr)
lbrynet.SetRPCTimeout(5 * time.Minute)

status, err := lbrynet.Status()
if err != nil {
return nil, err
}

if !status.IsRunning {
return nil, errors.New("SDK is not running")
}

// Should check to see if the SDK owns the channel

// Should check to see if wallet is unlocked
// but jsonrpc.Client doesn't have WalletStatus method
// so skip for now

// Should check to see if streams are configured to be reflected and warn if not
// but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected
// so use File.UploadingToReflector as a proxy for now

publisher := LocalSDKPublisher {
channelID: channelID,
publishBid: publishBid,
lbrynet: lbrynet,
}
return &publisher, nil
}

func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) {
streamCreateOptions := jsonrpc.StreamCreateOptions {
ClaimCreateOptions: jsonrpc.ClaimCreateOptions {
Title: &video.Title,
Description: &video.Description,
Languages: video.Languages,
ThumbnailURL: &video.ThumbnailURL,
Tags: video.Tags,
},
ReleaseTime: &video.ReleaseTime,
ChannelID: &p.channelID,
License: util.PtrToString("Copyrighted (contact publisher)"),
}

txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions)
if err != nil {
return nil, err
}

if !reflectStream {
return nil, nil
}

done := make(chan error, 1)
go func() {
for {
fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid)
if err != nil {
log.Errorf("Error finding file by txid: %v", err)
done <- err
return
}
if fileListResponse == nil {
log.Errorf("Could not find file in list with correct txid")
done <- err
return
}

fileStatus := fileListResponse.Items[fileIndex]
if fileStatus.IsFullyReflected {
log.Info("Stream is fully reflected")
break
}
if !fileStatus.UploadingToReflector {
log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.")
done <- errors.New("Stream is not being reflected (check lbrynet settings).")
return
}
log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress)
time.Sleep(5 * time.Second)
}
done <- nil
}()

return done, nil
}

// if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed
func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) {
response, err := client.FileList(0, 20)
for {
if err != nil {
log.Errorf("Error getting file list page: %v", err)
return nil, 0, err
}
index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid })
if index < len(response.Items) {
return response, index, nil
}
if response.Page >= response.TotalPages {
return nil, 0, nil
}
response, err = client.FileList(response.Page + 1, 20)
}
}
Loading