Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add over provision factor #229

Merged
merged 2 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Support to delete downloaded backups after restore operation (k8s only).
- New parameter `overProvision`: when set available capacity on a node is calculated by taking into account
the reserved capacity in the pool based on existing volumes.

## [1.2.3] - 2023-08-31

Expand Down
31 changes: 28 additions & 3 deletions pkg/client/linstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func (s *Linstor) Detach(ctx context.Context, volId, node string) error {
}

// CapacityBytes returns the amount of free space in the storage pool specified by the params and topology.
func (s *Linstor) CapacityBytes(ctx context.Context, storagePool string, segments map[string]string) (int64, error) {
func (s *Linstor) CapacityBytes(ctx context.Context, storagePool string, overProvision *float64, segments map[string]string) (int64, error) {
log := s.log.WithField("storage-pool", storagePool).WithField("segments", segments)

var requestedStoragePools []string
Expand Down Expand Up @@ -703,8 +703,33 @@ func (s *Linstor) CapacityBytes(ctx context.Context, storagePool string, segment
continue
}

if storagePool == "" || storagePool == sp.StoragePoolName {
log.Trace("adding storage pool capacity")
if sp.ProviderKind == lapi.DISKLESS {
log.Trace("not adding diskless pool")
continue
}

if storagePool != "" && storagePool != sp.StoragePoolName {
log.Trace("not a requested storage pool")
continue
}

if overProvision != nil {
virtualCapacity := float64(sp.TotalCapacity) * *overProvision

reservedCapacity, err := s.client.ReservedCapacity(ctx, sp.NodeName, sp.StoragePoolName)
if err != nil {
return 0, fmt.Errorf("failed to fetch reserved capacity: %w", err)
}

if reservedCapacity > int64(virtualCapacity) {
log.Trace("ignoring pool with exhausted capacity")
continue
}

log.WithField("add-capacity", int64(virtualCapacity)-reservedCapacity).Trace("adding storage pool capacity")
total += int64(virtualCapacity) - reservedCapacity
} else {
log.WithField("add-capacity", sp.FreeCapacity).Trace("adding storage pool capacity")
total += sp.FreeCapacity
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/linstor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestLinstor_CapacityBytes(t *testing.T) {
testcase := &testcases[i]

t.Run(testcase.name, func(t *testing.T) {
cap, err := cl.CapacityBytes(context.Background(), testcase.storagePool, testcase.topology)
cap, err := cl.CapacityBytes(context.Background(), testcase.storagePool, nil, testcase.topology)
assert.NoError(t, err)
assert.Equal(t, testcase.expectedCapacity, cap)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (s *MockStorage) Status(ctx context.Context, volId string) ([]string, *csi.
return nodes, &csi.VolumeCondition{Abnormal: false, Message: "All replicas normal"}, nil
}

func (s *MockStorage) CapacityBytes(ctx context.Context, sp string, segments map[string]string) (int64, error) {
func (s *MockStorage) CapacityBytes(ctx context.Context, pool string, overProvision *float64, segments map[string]string) (int64, error) {
return 50000000, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (d Driver) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*
for _, segment := range accessibleSegments {
d.log.WithField("segment", segment).Debug("Checking capacity of segment")

bytes, err := d.Storage.CapacityBytes(ctx, params.StoragePool, segment)
bytes, err := d.Storage.CapacityBytes(ctx, params.StoragePool, params.OverProvision, segment)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
Expand Down
124 changes: 110 additions & 14 deletions pkg/linstor/highlevelclient/high_level_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

lc "github.com/LINBIT/golinstor"
lapi "github.com/LINBIT/golinstor/client"
"github.com/container-storage-interface/spec/lib/go/csi"

Expand Down Expand Up @@ -52,6 +53,11 @@ func NewHighLevelClient(options ...lapi.Option) (*HighLevelClient, error) {
timeout: 1 * time.Minute,
}

c.Resources = &ResourceCacheProvider{
ResourceProvider: c.Resources,
timeout: 1 * time.Minute,
}

return &HighLevelClient{Client: c}, nil
}

Expand Down Expand Up @@ -158,6 +164,44 @@ func (c *HighLevelClient) NodesForTopology(ctx context.Context, segments map[str
return result, nil
}

func (c *HighLevelClient) ReservedCapacity(ctx context.Context, node, pool string) (int64, error) {
ress, err := c.Resources.GetResourceView(ctx, &lapi.ListOpts{
Node: []string{node},
StoragePool: []string{pool},
})
if err != nil {
return 0, err
}

var reserved int64

for i := range ress {
res := &ress[i]

// can never be too careful with LINSTOR filtering
if res.NodeName != node {
continue
}

for j := range res.Volumes {
vol := &res.Volumes[j]
if vol.StoragePoolName != pool {
continue
}

// Last layer is the storage layer
if len(vol.LayerDataList) > 0 {
storageVol, ok := vol.LayerDataList[len(vol.LayerDataList)-1].Data.(*lapi.StorageVolume)
if ok {
reserved += storageVol.UsableSizeKib
}
}
}
}

return reserved, nil
}

type NodeCacheProvider struct {
lapi.NodeProvider
timeout time.Duration
Expand All @@ -178,7 +222,7 @@ func (n *NodeCacheProvider) GetAll(ctx context.Context, opts ...*lapi.ListOpts)
if n.nodesUpdated.Add(n.timeout).After(now) {
return filter(n.nodes, func(node lapi.Node) string {
return node.Name
}, opts...), nil
}, nil, opts...), nil
}

nodes, err := n.NodeProvider.GetAll(ctx)
Expand All @@ -191,7 +235,7 @@ func (n *NodeCacheProvider) GetAll(ctx context.Context, opts ...*lapi.ListOpts)

return filter(n.nodes, func(node lapi.Node) string {
return node.Name
}, opts...), nil
}, nil, opts...), nil
}

func (n *NodeCacheProvider) GetStoragePoolView(ctx context.Context, opts ...*lapi.ListOpts) ([]lapi.StoragePool, error) {
Expand All @@ -201,9 +245,11 @@ func (n *NodeCacheProvider) GetStoragePoolView(ctx context.Context, opts ...*lap
now := time.Now()

if n.poolsUpdated.Add(n.timeout).After(now) {
return filter(n.pools, func(pool lapi.StoragePool) string {
return pool.NodeName
}, opts...), nil
return filter(n.pools,
func(pool lapi.StoragePool) string { return pool.NodeName },
func(pool lapi.StoragePool) string { return pool.StoragePoolName },
opts...,
), nil
}

pools, err := n.NodeProvider.GetStoragePoolView(ctx)
Expand All @@ -214,30 +260,80 @@ func (n *NodeCacheProvider) GetStoragePoolView(ctx context.Context, opts ...*lap
n.pools = pools
n.poolsUpdated = now

return filter(n.pools, func(pool lapi.StoragePool) string {
return pool.NodeName
}, opts...), nil
return filter(n.pools,
func(pool lapi.StoragePool) string { return pool.NodeName },
func(pool lapi.StoragePool) string { return pool.StoragePoolName },
opts...,
), nil
}

type ResourceCacheProvider struct {
lapi.ResourceProvider
timeout time.Duration
resourceViewMu sync.Mutex
resourceViewUpdated time.Time
resourceView []lapi.ResourceWithVolumes
}

func filter[T any](items []T, getNodeName func(T) string, opts ...*lapi.ListOpts) []T {
func (r *ResourceCacheProvider) GetResourceView(ctx context.Context, opts ...*lapi.ListOpts) ([]lapi.ResourceWithVolumes, error) {
r.resourceViewMu.Lock()
defer r.resourceViewMu.Unlock()

now := time.Now()

if r.resourceViewUpdated.Add(r.timeout).After(now) {
return filter(r.resourceView,
func(res lapi.ResourceWithVolumes) string { return res.NodeName },
func(res lapi.ResourceWithVolumes) string { return res.Props[lc.KeyStorPoolName] },
opts...,
), nil
}

view, err := r.ResourceProvider.GetResourceView(ctx)
if err != nil {
return nil, err
}

r.resourceView = view
r.resourceViewUpdated = now

return filter(r.resourceView,
func(res lapi.ResourceWithVolumes) string { return res.NodeName },
func(res lapi.ResourceWithVolumes) string { return res.Props[lc.KeyStorPoolName] },
opts...,
), nil
}

func filter[T any](items []T, getNodeName, getPoolName func(T) string, opts ...*lapi.ListOpts) []T {
filterNames := make(map[string]struct{})
filterPools := make(map[string]struct{})

for _, o := range opts {
for _, n := range o.Node {
filterNames[n] = struct{}{}
}
}

if len(filterNames) == 0 {
return items
for _, sp := range o.StoragePool {
filterPools[sp] = struct{}{}
}
}

var result []T

for _, item := range items {
if _, ok := filterNames[getNodeName(item)]; ok {
result = append(result, item)
if len(filterNames) > 0 {
if _, ok := filterNames[getNodeName(item)]; !ok {
continue
}
}

if len(filterPools) > 0 {
if _, ok := filterPools[getPoolName(item)]; !ok {
continue
}
}

result = append(result, item)
}

return result
Expand Down
13 changes: 13 additions & 0 deletions pkg/volume/parameter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
postmountxfsopts
resourcegroup
usepvcname
overprovision
)

// Parameters configuration for linstor volumes.
Expand Down Expand Up @@ -91,6 +92,10 @@ type Parameters struct {
Properties map[string]string
// UsePvcName derives the volume name from the PVC name+namespace, if that information is available.
UsePvcName bool
// OverProvision determines how much free capacity is reported.
// If set, free capacity is calculated by (TotalCapacity * OverProvision) - ReservedCapacity.
// If not set, the free capacity is taken directly from LINSTOR.
OverProvision *float64
}

const DefaultDisklessStoragePoolName = "DfltDisklessStorPool"
Expand Down Expand Up @@ -229,6 +234,14 @@ func NewParameters(params map[string]string, topologyPrefix string) (Parameters,
// This parameter was unused. It is just parsed to not break any old storage classes that might be using
// it. Storage sizes are handled via CSI requests directly.
log.Warnf("using useless parameter '%s'", rawkey)

case overprovision:
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return p, err
}

p.OverProvision = &f
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/volume/paramkey_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Querier interface {
// AllocationSizeKiB returns the number of KiB required to provision required bytes.
AllocationSizeKiB(requiredBytes, limitBytes int64) (int64, error)
// CapacityBytes returns the amount of free space, in bytes, in the storage pool specified by the params and topology.
CapacityBytes(ctx context.Context, pool string, segments map[string]string) (int64, error)
CapacityBytes(ctx context.Context, pool string, overProvision *float64, segments map[string]string) (int64, error)
}

// Mounter handles the filesystems located on volumes.
Expand Down
Loading