Skip to content

Commit

Permalink
rename s3s.App to s3s.Client
Browse files Browse the repository at this point in the history
  • Loading branch information
koluku committed Jun 16, 2023
1 parent f727abe commit 71e5e69
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 58 deletions.
4 changes: 2 additions & 2 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/pkg/errors"
)

func (app *App) GetS3Bucket(ctx context.Context) ([]string, error) {
func (c *Client) GetS3Bucket(ctx context.Context) ([]string, error) {
input := &s3.ListBucketsInput{}
output, err := app.s3.ListBuckets(ctx, input)
output, err := c.s3.ListBuckets(ctx, input)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
24 changes: 12 additions & 12 deletions cmd/s3s/delver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"github.com/pkg/errors"
)

func pathDelver(ctx context.Context, app *s3s.App, paths []string) ([]string, error) {
func pathDelver(ctx context.Context, client *s3s.Client, paths []string) ([]string, error) {
if len(paths) == 0 {
path, err := delveBucketList(ctx, app)
path, err := delveBucketList(ctx, client)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -29,7 +29,7 @@ func pathDelver(ctx context.Context, app *s3s.App, paths []string) ([]string, er
bucket = u.Hostname()
prefix = strings.TrimPrefix(u.Path, "/")

path, err := delvePrefix(ctx, app, bucket, prefix)
path, err := delvePrefix(ctx, client, bucket, prefix)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -40,8 +40,8 @@ func pathDelver(ctx context.Context, app *s3s.App, paths []string) ([]string, er
return paths, nil
}

func delveBucketList(ctx context.Context, app *s3s.App) (string, error) {
buckets, err := app.GetS3Bucket(ctx)
func delveBucketList(ctx context.Context, client *s3s.Client) (string, error) {
buckets, err := client.GetS3Bucket(ctx)
if err != nil {
return "", errors.WithStack(err)
}
Expand All @@ -56,11 +56,11 @@ func delveBucketList(ctx context.Context, app *s3s.App) (string, error) {
return "", errors.WithStack(err)
}

return delvePrefix(ctx, app, buckets[index], "")
return delvePrefix(ctx, client, buckets[index], "")
}

func delvePrefix(ctx context.Context, app *s3s.App, bucket string, prefix string) (string, error) {
s3Dirs, err := app.GetS3Dir(ctx, bucket, prefix)
func delvePrefix(ctx context.Context, client *s3s.Client, bucket string, prefix string) (string, error) {
s3Dirs, err := client.GetS3Dir(ctx, bucket, prefix)
if err != nil {
return "", errors.WithStack(err)
}
Expand Down Expand Up @@ -89,15 +89,15 @@ func delvePrefix(ctx context.Context, app *s3s.App, bucket string, prefix string
case 0:
parent = path.Join(prefix, "../")
if parent == "." {
return delvePrefix(ctx, app, bucket, "")
return delvePrefix(ctx, client, bucket, "")
}
if parent == ".." {
return delveBucketList(ctx, app)
return delveBucketList(ctx, client)
}
return delvePrefix(ctx, app, bucket, parent+"/")
return delvePrefix(ctx, client, bucket, parent+"/")
case 1:
return fmt.Sprintf("s3://%s/%s", bucket, prefix), nil
default:
return delvePrefix(ctx, app, bucket, s3Dirs[index])
return delvePrefix(ctx, client, bucket, s3Dirs[index])
}
}
13 changes: 11 additions & 2 deletions cmd/s3s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
Version = "current"

// AWS
region string
profile string
region string

// S3 Select Query
queryStr string
Expand Down Expand Up @@ -62,6 +63,14 @@ func main() {
Version: Version,
Usage: "Easy S3 select like searching in directories",
Flags: []cli.Flag{
&cli.StringFlag{
Category: "AWS:",
Name: "profile",
Usage: "profile of aws credential",
Value: os.Getenv("AWS_Profile"),
DefaultText: "ENV[\"AWS_Profile\"]",
Destination: &region,
},
&cli.StringFlag{
Category: "AWS:",
Name: "region",
Expand Down Expand Up @@ -221,7 +230,7 @@ func cmd(ctx context.Context, paths []string) error {
}

// Initialize
app, err := s3s.NewApp(ctx, region, maxRetries, threadCount)
app, err := s3s.New(ctx, profile, region)
if err != nil {
return errors.WithStack(err)
}
Expand Down
20 changes: 10 additions & 10 deletions prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"github.com/pkg/errors"
)

func (app *App) GetS3Dir(ctx context.Context, bucket string, prefix string) ([]string, error) {
func (c *Client) GetS3Dir(ctx context.Context, bucket string, prefix string) ([]string, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
}
pagenator := s3.NewListObjectsV2Paginator(app.s3, input)
pagenator := s3.NewListObjectsV2Paginator(c.s3, input)

var s3Keys []string
for pagenator.HasMorePages() {
Expand Down Expand Up @@ -63,13 +63,13 @@ type ObjectInfo struct {
Size int64
}

func (app *App) GetS3OneKey(ctx context.Context, bucket string, prefix string) (*ObjectInfo, error) {
func (c *Client) GetS3OneKey(ctx context.Context, bucket string, prefix string) (*ObjectInfo, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
MaxKeys: 1,
}
output, err := app.s3.ListObjectsV2(ctx, input)
output, err := c.s3.ListObjectsV2(ctx, input)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -81,12 +81,12 @@ func (app *App) GetS3OneKey(ctx context.Context, bucket string, prefix string) (
}, nil
}

func (app *App) GetS3Keys(ctx context.Context, sender chan<- ObjectInfo, bucket string, prefix string, info *KeyInfo) error {
func (c *Client) GetS3Keys(ctx context.Context, sender chan<- ObjectInfo, bucket string, prefix string, info *KeyInfo) error {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}
pagenator := s3.NewListObjectsV2Paginator(app.s3, input)
pagenator := s3.NewListObjectsV2Paginator(c.s3, input)

for pagenator.HasMorePages() {
output, err := pagenator.NextPage(ctx)
Expand All @@ -106,7 +106,7 @@ func (app *App) GetS3Keys(ctx context.Context, sender chan<- ObjectInfo, bucket
return nil
}

func (app *App) OptimizateALBPaths(ctx context.Context, paths []string, keyInfo *KeyInfo) ([]string, error) {
func (c *Client) OptimizateALBPaths(ctx context.Context, paths []string, keyInfo *KeyInfo) ([]string, error) {
if keyInfo.KeyType != KeyTypeALB || isTimeZeroRange(keyInfo.Since, keyInfo.Until) {
return nil, nil
}
Expand All @@ -121,7 +121,7 @@ func (app *App) OptimizateALBPaths(ctx context.Context, paths []string, keyInfo
var bucket, prefix string
bucket = u.Hostname()
prefix = strings.TrimPrefix(u.Path, "/")
oi, err := app.GetS3OneKey(ctx, bucket, prefix)
oi, err := c.GetS3OneKey(ctx, bucket, prefix)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (app *App) OptimizateALBPaths(ctx context.Context, paths []string, keyInfo
return npaths, nil
}

func (app *App) OptimizateCFPaths(ctx context.Context, paths []string, keyInfo *KeyInfo) ([]string, error) {
func (c *Client) OptimizateCFPaths(ctx context.Context, paths []string, keyInfo *KeyInfo) ([]string, error) {
if keyInfo.KeyType != KeyTypeCF || isTimeZeroRange(keyInfo.Since, keyInfo.Until) {
return nil, nil
}
Expand All @@ -172,7 +172,7 @@ func (app *App) OptimizateCFPaths(ctx context.Context, paths []string, keyInfo *
var bucket, prefix string
bucket = u.Hostname()
prefix = strings.TrimPrefix(u.Path, "/")
oi, err := app.GetS3OneKey(ctx, bucket, prefix)
oi, err := c.GetS3OneKey(ctx, bucket, prefix)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
58 changes: 28 additions & 30 deletions s3s.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,47 @@ import (
"net/url"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type App struct {
threadCount int
s3 *s3.Client
const (
DEFAULT_THREAD_COUNT = 150
)

type Client struct {
s3 *s3.Client
}

func NewApp(ctx context.Context, region string, maxRetries int, threadCount int) (*App, error) {
func New(ctx context.Context, profile string, region string) (*Client, error) {
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
if err != nil {
return nil, errors.WithStack(err)
}

client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.RetryMaxAttempts = maxRetries
o.RetryMode = aws.RetryModeStandard
})
api := s3.NewFromConfig(cfg)

app := &App{
threadCount: threadCount,
s3: client,
client := &Client{
s3: api,
}

return app, nil
return client, nil
}

func (app *App) Run(ctx context.Context, paths []string, keyInfo *KeyInfo, queryStr string, queryInfo *QueryInfo) error {
func (c *Client) Run(ctx context.Context, paths []string, keyInfo *KeyInfo, queryStr string, queryInfo *QueryInfo) error {
switch keyInfo.KeyType {
case KeyTypeALB:
albPaths, err := app.OptimizateALBPaths(ctx, paths, keyInfo)
albPaths, err := c.OptimizateALBPaths(ctx, paths, keyInfo)
if err != nil {
return errors.WithStack(err)
}
if albPaths != nil {
paths = albPaths
}
case KeyTypeCF:
cfPaths, err := app.OptimizateCFPaths(ctx, paths, keyInfo)
cfPaths, err := c.OptimizateCFPaths(ctx, paths, keyInfo)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -57,17 +55,17 @@ func (app *App) Run(ctx context.Context, paths []string, keyInfo *KeyInfo, query
}
}

ch := make(chan ObjectInfo, app.threadCount)
ch := make(chan ObjectInfo, DEFAULT_THREAD_COUNT)
eg, egctx := errgroup.WithContext(ctx)

eg.Go(func() error {
if err := app.getBucketKeys(egctx, ch, paths, keyInfo); err != nil {
if err := c.getBucketKeys(egctx, ch, paths, keyInfo); err != nil {
return errors.WithStack(err)
}
return nil
})
eg.Go(func() error {
if err := app.execS3Select(egctx, ch, queryStr, queryInfo); err != nil {
if err := c.execS3Select(egctx, ch, queryStr, queryInfo); err != nil {
return errors.WithStack(err)
}
return nil
Expand All @@ -80,18 +78,18 @@ func (app *App) Run(ctx context.Context, paths []string, keyInfo *KeyInfo, query
return nil
}

func (app *App) DryRun(ctx context.Context, paths []string, keyInfo *KeyInfo, queryStr string, queryInfo *QueryInfo) (int64, int, error) {
func (c *Client) DryRun(ctx context.Context, paths []string, keyInfo *KeyInfo, queryStr string, queryInfo *QueryInfo) (int64, int, error) {
switch keyInfo.KeyType {
case KeyTypeALB:
albPaths, err := app.OptimizateALBPaths(ctx, paths, keyInfo)
albPaths, err := c.OptimizateALBPaths(ctx, paths, keyInfo)
if err != nil {
return 0, 0, errors.WithStack(err)
}
if albPaths != nil {
paths = albPaths
}
case KeyTypeCF:
cfPaths, err := app.OptimizateCFPaths(ctx, paths, keyInfo)
cfPaths, err := c.OptimizateCFPaths(ctx, paths, keyInfo)
if err != nil {
return 0, 0, errors.WithStack(err)
}
Expand All @@ -102,11 +100,11 @@ func (app *App) DryRun(ctx context.Context, paths []string, keyInfo *KeyInfo, qu

var scanByte int64
var count int
ch := make(chan ObjectInfo, app.threadCount)
ch := make(chan ObjectInfo, DEFAULT_THREAD_COUNT)

eg, egctx := errgroup.WithContext(ctx)
eg.Go(func() error {
if err := app.getBucketKeys(egctx, ch, paths, keyInfo); err != nil {
if err := c.getBucketKeys(egctx, ch, paths, keyInfo); err != nil {
return errors.WithStack(err)
}
return nil
Expand All @@ -126,11 +124,11 @@ func (app *App) DryRun(ctx context.Context, paths []string, keyInfo *KeyInfo, qu
return scanByte, count, nil
}

func (app *App) getBucketKeys(ctx context.Context, ch chan<- ObjectInfo, paths []string, info *KeyInfo) error {
func (c *Client) getBucketKeys(ctx context.Context, ch chan<- ObjectInfo, paths []string, info *KeyInfo) error {
defer close(ch)

eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(app.threadCount)
eg.SetLimit(DEFAULT_THREAD_COUNT)
for _, path := range paths {
path := path
eg.Go(func() error {
Expand All @@ -142,7 +140,7 @@ func (app *App) getBucketKeys(ctx context.Context, ch chan<- ObjectInfo, paths [
bucket = u.Hostname()
prefix = strings.TrimPrefix(u.Path, "/")

if app.GetS3Keys(egctx, ch, bucket, prefix, info); err != nil {
if c.GetS3Keys(egctx, ch, bucket, prefix, info); err != nil {
return errors.WithStack(err)
}
return nil
Expand All @@ -156,10 +154,10 @@ func (app *App) getBucketKeys(ctx context.Context, ch chan<- ObjectInfo, paths [
return nil
}

func (app *App) execS3Select(ctx context.Context, reciever <-chan ObjectInfo, queryStr string, info *QueryInfo) error {
func (c *Client) execS3Select(ctx context.Context, reciever <-chan ObjectInfo, queryStr string, info *QueryInfo) error {
var count int
eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(app.threadCount)
eg.SetLimit(DEFAULT_THREAD_COUNT)

for r := range reciever {
bucket := r.Bucket
Expand Down Expand Up @@ -200,7 +198,7 @@ func (app *App) execS3Select(ctx context.Context, reciever <-chan ObjectInfo, qu
}

eg.Go(func() error {
result, err := app.S3Select(egctx, input, info)
result, err := c.S3Select(egctx, input, info)
if err != nil {
return errors.WithStack(err)
}
Expand Down
19 changes: 17 additions & 2 deletions s3select.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,31 @@ const (
FormatTypeCFLogs
)

func (f FormatType) String() string {
switch f {
case FormatTypeJSON:
return "JSON"
case FormatTypeCSV:
return "CSV"
case FormatTypeALBLogs:
return "ALB"
case FormatTypeCFLogs:
return "CloudFront"
default:
return ""
}
}

type QueryInfo struct {
FormatType FormatType
FieldDelimiter string
RecordDelimiter string
IsCountMode bool
}

func (app *App) S3Select(ctx context.Context, input Querying, info *QueryInfo) (*Result, error) {
func (c *Client) S3Select(ctx context.Context, input Querying, info *QueryInfo) (*Result, error) {
params := input.toParameter()
resp, err := app.s3.SelectObjectContent(ctx, params)
resp, err := c.s3.SelectObjectContent(ctx, params)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down

0 comments on commit 71e5e69

Please sign in to comment.