Skip to content

Commit

Permalink
Linter fix
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmacdonald committed Dec 28, 2024
2 parents a27a3a4 + fdc0521 commit c1268f3
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 224 deletions.
62 changes: 23 additions & 39 deletions internal/ban/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,43 @@ import (
"errors"
"log/slog"
"sync"
"time"

"github.com/leighmacdonald/gbans/internal/discord"
"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/internal/queue"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/leighmacdonald/steamid/v4/steamid"
"github.com/riverqueue/river"
)

type ExpirationArgs struct{}

func (args ExpirationArgs) Kind() string {
return "bans_expired"
}

func (args ExpirationArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Minute}}
}

func NewExpirationWorker(bansSteam domain.BanSteamUsecase, bansNet domain.BanNetUsecase, bansASN domain.BanASNUsecase,
bansPerson domain.PersonUsecase, notifications domain.NotificationUsecase, config domain.ConfigUsecase,
) *ExpirationWorker {
return &ExpirationWorker{
bansSteam: bansSteam,
bansNet: bansNet,
bansASN: bansASN,
bansPerson: bansPerson,
func NewExpirationMonitor(steam domain.BanSteamUsecase, net domain.BanNetUsecase, asn domain.BanASNUsecase,
person domain.PersonUsecase, notifications domain.NotificationUsecase, config domain.ConfigUsecase,
) *ExpirationMonitor {
return &ExpirationMonitor{
steam: steam,
net: net,
asn: asn,
person: person,
notifications: notifications,
config: config,
}
}

type ExpirationWorker struct {
river.WorkerDefaults[ExpirationArgs]
bansSteam domain.BanSteamUsecase
bansNet domain.BanNetUsecase
bansASN domain.BanASNUsecase
bansPerson domain.PersonUsecase
type ExpirationMonitor struct {
steam domain.BanSteamUsecase
net domain.BanNetUsecase
asn domain.BanASNUsecase
person domain.PersonUsecase
notifications domain.NotificationUsecase
config domain.ConfigUsecase
}

func (worker *ExpirationWorker) Work(ctx context.Context, _ *river.Job[ExpirationArgs]) error {
func (monitor *ExpirationMonitor) Update(ctx context.Context) {
waitGroup := &sync.WaitGroup{}
waitGroup.Add(3)

go func() {
defer waitGroup.Done()

expiredBans, errExpiredBans := worker.bansSteam.Expired(ctx)
expiredBans, errExpiredBans := monitor.steam.Expired(ctx)
if errExpiredBans != nil && !errors.Is(errExpiredBans, domain.ErrNoResult) {
slog.Error("Failed to get expired expiredBans", log.ErrAttr(errExpiredBans))

Expand All @@ -64,13 +50,13 @@ func (worker *ExpirationWorker) Work(ctx context.Context, _ *river.Job[Expiratio

for _, expiredBan := range expiredBans {
ban := expiredBan
if errDrop := worker.bansSteam.Delete(ctx, &ban, false); errDrop != nil {
if errDrop := monitor.steam.Delete(ctx, &ban, false); errDrop != nil {
slog.Error("Failed to drop expired expiredBan", log.ErrAttr(errDrop))

continue
}

person, errPerson := worker.bansPerson.GetPersonBySteamID(ctx, ban.TargetID)
person, errPerson := monitor.person.GetPersonBySteamID(ctx, ban.TargetID)
if errPerson != nil {
slog.Error("Failed to get expired Person", log.ErrAttr(errPerson))

Expand All @@ -82,9 +68,9 @@ func (worker *ExpirationWorker) Work(ctx context.Context, _ *river.Job[Expiratio
name = person.SteamID.String()
}

worker.notifications.Enqueue(ctx, domain.NewDiscordNotification(domain.ChannelBanLog, discord.BanExpiresMessage(ban, person, worker.config.ExtURL(ban))))
monitor.notifications.Enqueue(ctx, domain.NewDiscordNotification(domain.ChannelBanLog, discord.BanExpiresMessage(ban, person, monitor.config.ExtURL(ban))))

worker.notifications.Enqueue(ctx, domain.NewSiteUserNotification(
monitor.notifications.Enqueue(ctx, domain.NewSiteUserNotification(
[]steamid.SteamID{person.SteamID},
domain.SeverityInfo,
"Your mute/ban period has expired",
Expand All @@ -99,13 +85,13 @@ func (worker *ExpirationWorker) Work(ctx context.Context, _ *river.Job[Expiratio
go func() {
defer waitGroup.Done()

expiredNetBans, errExpiredNetBans := worker.bansNet.Expired(ctx)
expiredNetBans, errExpiredNetBans := monitor.net.Expired(ctx)
if errExpiredNetBans != nil && !errors.Is(errExpiredNetBans, domain.ErrNoResult) {
slog.Warn("Failed to get expired network bans", log.ErrAttr(errExpiredNetBans))
} else {
for _, expiredNetBan := range expiredNetBans {
expiredBan := expiredNetBan
if errDropBanNet := worker.bansNet.Delete(ctx, expiredNetBan.NetID, domain.RequestUnban{UnbanReasonText: "Expired"}, false); errDropBanNet != nil {
if errDropBanNet := monitor.net.Delete(ctx, expiredNetBan.NetID, domain.RequestUnban{UnbanReasonText: "Expired"}, false); errDropBanNet != nil {
if !errors.Is(errDropBanNet, domain.ErrNoResult) {
slog.Error("Failed to drop expired network expiredNetBan", log.ErrAttr(errDropBanNet))
}
Expand All @@ -119,12 +105,12 @@ func (worker *ExpirationWorker) Work(ctx context.Context, _ *river.Job[Expiratio
go func() {
defer waitGroup.Done()

expiredASNBans, errExpiredASNBans := worker.bansASN.Expired(ctx)
expiredASNBans, errExpiredASNBans := monitor.asn.Expired(ctx)
if errExpiredASNBans != nil && !errors.Is(errExpiredASNBans, domain.ErrNoResult) {
slog.Error("Failed to get expired asn bans", log.ErrAttr(errExpiredASNBans))
} else {
for _, expired := range expiredASNBans {
if errDropASN := worker.bansASN.Delete(ctx, expired.BanASNId, domain.RequestUnban{UnbanReasonText: "Expired"}); errDropASN != nil {
if errDropASN := monitor.asn.Delete(ctx, expired.BanASNId, domain.RequestUnban{UnbanReasonText: "Expired"}); errDropASN != nil {
slog.Error("Failed to drop expired asn ban", log.ErrAttr(errDropASN))
} else {
slog.Info("ASN ban expired", slog.Int64("ban_id", expired.BanASNId))
Expand All @@ -134,6 +120,4 @@ func (worker *ExpirationWorker) Work(ctx context.Context, _ *river.Job[Expiratio
}()

waitGroup.Wait()

return nil
}
28 changes: 0 additions & 28 deletions internal/blocklist/blocklist_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/internal/httphelper"
"github.com/leighmacdonald/gbans/internal/queue"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/leighmacdonald/steamid/v4/steamid"
"github.com/riverqueue/river"
)

type blocklistUsecase struct {
Expand Down Expand Up @@ -313,28 +310,3 @@ func (b blocklistUsecase) DeleteCIDRBlockWhitelist(ctx context.Context, whitelis

return nil
}

type ListUpdaterArgs struct{}

func (args ListUpdaterArgs) Kind() string {
return "blocklist_update"
}

func (args ListUpdaterArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Hour * 24}}
}

func NewListUpdaterWorker(lists domain.BlocklistUsecase) *ListUpdaterWorker {
return &ListUpdaterWorker{lists: lists}
}

type ListUpdaterWorker struct {
river.WorkerDefaults[ListUpdaterArgs]
lists domain.BlocklistUsecase
}

func (worker *ListUpdaterWorker) Work(ctx context.Context, _ *river.Job[ListUpdaterArgs]) error {
worker.lists.Sync(ctx)

return nil
}
95 changes: 38 additions & 57 deletions internal/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,15 @@ func firstTimeSetup(ctx context.Context, persons domain.PersonUsecase, news doma
}

func createQueueWorkers(people domain.PersonUsecase, notifications domain.NotificationUsecase,
discordUC domain.DiscordUsecase, authRepo domain.AuthRepository, memberships *steamgroup.Memberships,
patreonUC domain.PatreonUsecase, bansSteam domain.BanSteamUsecase, bansNet domain.BanNetUsecase, bansASN domain.BanASNUsecase,
configUC domain.ConfigUsecase, demos domain.DemoUsecase, reports domain.ReportUsecase,
blocklists domain.BlocklistUsecase, discordOAuth domain.DiscordOAuthUsecase,
discordUC domain.DiscordUsecase, authRepo domain.AuthRepository,
patreonUC domain.PatreonUsecase, reports domain.ReportUsecase, discordOAuth domain.DiscordOAuthUsecase,
) *river.Workers {
workers := river.NewWorkers()

river.AddWorker[notification.SenderArgs](workers, notification.NewSenderWorker(people, notifications, discordUC))
river.AddWorker[auth.CleanupArgs](workers, auth.NewCleanupWorker(authRepo))
river.AddWorker[steamgroup.MembershipArgs](workers, steamgroup.NewMembershipWorker(memberships))
river.AddWorker[patreon.AuthUpdateArgs](workers, patreon.NewSyncWorker(patreonUC))
river.AddWorker[ban.ExpirationArgs](workers, ban.NewExpirationWorker(bansSteam, bansNet, bansASN, people, notifications, configUC))
river.AddWorker[demo.CleanupArgs](workers, demo.NewCleanupWorker(demos))
river.AddWorker[report.MetaInfoArgs](workers, report.NewMetaInfoWorker(reports))
river.AddWorker[blocklist.ListUpdaterArgs](workers, blocklist.NewListUpdaterWorker(blocklists))
river.AddWorker[person.ExpiredArgs](workers, person.NewExpiredWorker(people))
river.AddWorker[discord.TokenRefreshArgs](workers, discord.NewTokenRefreshWorker(discordOAuth))

return workers
Expand All @@ -128,55 +121,20 @@ func createPeriodicJobs() []*river.PeriodicJob {
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(6*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return steamgroup.MembershipArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return patreon.AuthUpdateArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Minute),
func() (river.JobArgs, *river.InsertOpts) {
return ban.ExpirationArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Hour*24),
func() (river.JobArgs, *river.InsertOpts) {
return demo.CleanupArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return report.MetaInfoArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return blocklist.ListUpdaterArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Minute*5),
func() (river.JobArgs, *river.InsertOpts) {
return person.ExpiredArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Hour*12),
func() (river.JobArgs, *river.InsertOpts) {
Expand Down Expand Up @@ -342,20 +300,15 @@ func serveCmd() *cobra.Command { //nolint:maintidx
go chatUsecase.Start(ctx)

forumUsecase := forum.NewForumUsecase(forum.NewForumRepository(dbConn), notificationUsecase)
go forumUsecase.Start(ctx)

metricsUsecase := metrics.NewMetricsUsecase(eventBroadcaster)
go metricsUsecase.Start(ctx)

go forumUsecase.Start(ctx)

newsUsecase := news.NewNewsUsecase(news.NewNewsRepository(dbConn))

patreonUsecase := patreon.NewPatreonUsecase(patreon.NewPatreonRepository(dbConn), configUsecase)

srcdsUsecase := srcds.NewSrcdsUsecase(srcds.NewRepository(dbConn), configUsecase, serversUC, personUsecase, reportUsecase, notificationUsecase, banUsecase)

wikiUsecase := wiki.NewWikiUsecase(wiki.NewWikiRepository(dbConn))

authRepo := auth.NewAuthRepository(dbConn)
authUsecase := auth.NewAuthUsecase(authRepo, configUsecase, personUsecase, banUsecase, serversUC)

Expand Down Expand Up @@ -428,17 +381,45 @@ func serveCmd() *cobra.Command { //nolint:maintidx
notificationUsecase,
discordUsecase,
authRepo,
steamgroup.NewMemberships(banGroupRepo),
patreonUsecase,
banUsecase,
banNetUsecase,
banASNUsecase,
configUsecase,
demos,
reportUsecase,
blocklistUsecase,
discordOAuthUsecase)

memberships := steamgroup.NewMemberships(banGroupRepo)
banExpirations := ban.NewExpirationMonitor(banUsecase, banNetUsecase, banASNUsecase, personUsecase, notificationUsecase, configUsecase)

go func() {
go memberships.Update(ctx)
go banExpirations.Update(ctx)
go blocklistUsecase.Sync(ctx)
go demos.Cleanup(ctx)

membershipsTicker := time.NewTicker(12 * time.Hour)
expirationsTicker := time.NewTicker(60 * time.Second)
reportIntoTicker := time.NewTicker(24 * time.Hour)
blocklistTicker := time.NewTicker(6 * time.Hour)
demoTicker := time.NewTicker(5 * time.Minute)

select {
case <-ctx.Done():
return
case <-membershipsTicker.C:
go memberships.Update(ctx)
case <-expirationsTicker.C:
go banExpirations.Update(ctx)
case <-reportIntoTicker.C:
go func() {
if errMeta := reportUsecase.GenerateMetaStats(ctx); errMeta != nil {
slog.Error("Failed to generate meta stats", log.ErrAttr(errMeta))
}
}()
case <-blocklistTicker.C:
go blocklistUsecase.Sync(ctx)
case <-demoTicker.C:
go demos.Cleanup(ctx)
}
}()

periodicJons := createPeriodicJobs()
queueClient, errClient := queue.Client(dbConn.Pool(), workers, periodicJons)
if errClient != nil {
Expand Down
27 changes: 0 additions & 27 deletions internal/demo/demo_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ import (
"github.com/dustin/go-humanize"
"github.com/gin-gonic/gin"
"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/internal/queue"
"github.com/leighmacdonald/gbans/pkg/fs"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/ricochet2200/go-disk-usage/du"
"github.com/riverqueue/river"
)

type demoUsecase struct {
Expand Down Expand Up @@ -369,28 +367,3 @@ func (d demoUsecase) RemoveOrphans(ctx context.Context) error {

return nil
}

type CleanupArgs struct{}

func (args CleanupArgs) Kind() string {
return "demo_cleanup"
}

func (args CleanupArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Hour * 24}}
}

func NewCleanupWorker(demos domain.DemoUsecase) *CleanupWorker {
return &CleanupWorker{demos: demos}
}

type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
demos domain.DemoUsecase
}

func (worker *CleanupWorker) Work(ctx context.Context, _ *river.Job[CleanupArgs]) error {
worker.demos.Cleanup(ctx)

return nil
}
Loading

0 comments on commit c1268f3

Please sign in to comment.