Skip to content

Commit

Permalink
Add initial support for converting static mode positions file. (#6765)
Browse files Browse the repository at this point in the history
* Add initial support for converting static mode positions file.

* add more doc

* clarify docs

* remove extra text

* add full unit test

* fix linting

* fix linting

* Update docs/sources/flow/reference/components/loki.source.file.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/flow/reference/components/loki.source.file.md

Co-authored-by: Clayton Cornell <[email protected]>

* instead of using os.temp use the test harness

* fix go.mod issue

* pr feedback

* pr feedback

* fix spelling mistake

* osx hates cleaning up files

---------

Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
mattdurham and clayton-cornell authored Mar 27, 2024
1 parent ef2f5f3 commit 5e037eb
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Main (unreleased)

- Add conversion from static to flow mode for `loki.source.windowsevent` via `legacy_bookmark_path`. (@mattdurham)

- Add ability to convert static mode positions file to `loki.source.file` compatible via `legacy_positions_file` argument. (@mattdurham)

### Features

- Added a new CLI flag `--stability.level` which defines the minimum stability
Expand Down
22 changes: 16 additions & 6 deletions docs/sources/flow/reference/components/loki.source.file.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,29 @@ log entries to the list of receivers passed in `forward_to`.

`loki.source.file` supports the following arguments:

| Name | Type | Description | Default | Required |
| --------------- | -------------------- | ----------------------------------------------------------------------------------- | ------- | -------- |
| `targets` | `list(map(string))` | List of files to read from. | | yes |
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| `encoding` | `string` | The encoding to convert from when reading files. | `""` | no |
| `tail_from_end` | `bool` | Whether a log file should be tailed from the end if a stored position is not found. | `false` | no |
| Name | Type | Description | Default | Required |
| ------------------------| -------------------- | ----------------------------------------------------------------------------------- | ------- | -------- |
| `targets` | `list(map(string))` | List of files to read from. | | yes |
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| `encoding` | `string` | The encoding to convert from when reading files. | `""` | no |
| `tail_from_end` | `bool` | Whether a log file should be tailed from the end if a stored position is not found. | `false` | no |
| `legacy_positions_file` | `string` | Allows conversion from legacy positions file. | `""` | no |

The `encoding` argument must be a valid [IANA encoding][] name. If not set, it
defaults to UTF-8.

You can use the `tail_from_end` argument when you want to tail a large file without reading its entire content.
When set to true, only new logs will be read, ignoring the existing ones.


{{< admonition type="note" >}}
The `legacy_positions_file` argument is used when you are transitioning from legacy. The legacy positions file will be rewritten into the new format.
This operation will only occur if the new positions file does not exist and the `legacy_positions_file` is valid.
Once converted successfully, the `legacy_positions_file` will be deleted.
If you add any labels before `loki.source.file`, then the positions file will conversion will not work.
The legacy positions file did not have a concept of labels in the positions file, so the conversion assumes no labels.
{{< /admonition >}}

## Blocks

The following blocks are supported inside the definition of `loki.source.file`:
Expand Down
63 changes: 63 additions & 0 deletions internal/component/common/loki/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,69 @@ type Positions interface {
Stop()
}

// LegacyFile is the copied struct for the static mode positions file.
type LegacyFile struct {
Positions map[string]string `yaml:"positions"`
}

// ConvertLegacyPositionsFile will convert the legacy positions file to the new format if:
// 1. There is no file at the newpath
// 2. There is a file at the legacy path and that it is valid yaml
// If all the above is true then the legacy file will be deleted.
func ConvertLegacyPositionsFile(legacyPath, newPath string, l log.Logger) {
legacyPositions := readLegacyFile(legacyPath, l)
// LegacyPositions did not exist or was invalid so return.
if legacyPositions == nil {
return
}
fi, err := os.Stat(newPath)
// If the newpath exists, then don't convert.
if err == nil && fi.Size() > 0 {
level.Info(l).Log("msg", "new positions file already exists", "path", newPath)
return
}

newPositions := make(map[Entry]string)
for k, v := range legacyPositions.Positions {
newPositions[Entry{
Path: k,
// This is a map of labels but must be an empty map since that is what the new positions expects.
Labels: "{}",
}] = v
}
// After conversion remove the file.
err = writePositionFile(newPath, newPositions)
if err != nil {
level.Error(l).Log("msg", "error writing new positions file from legacy", "path", newPath, "error", err)
}

// Finally remove the old path.
_ = os.Remove(legacyPath)
}

func readLegacyFile(legacyPath string, l log.Logger) *LegacyFile {
oldFile, err := os.Stat(legacyPath)
// If the old file doesn't exist or is empty then return early.
if err != nil || oldFile.Size() == 0 {
level.Info(l).Log("msg", "no legacy positions file found", "path", legacyPath)
return nil
}
// Try to read and parse the legacy file.
clean := filepath.Clean(legacyPath)
buf, err := os.ReadFile(clean)
if err != nil {
level.Error(l).Log("msg", "error reading legacy positions file", "path", clean, "error", err)
return nil
}
legacyPositions := &LegacyFile{}
err = yaml.UnmarshalStrict(buf, legacyPositions)
if err != nil {
level.Error(l).Log("msg", "error parsing legacy positions file", "path", clean, "error", err)
return nil
}
return legacyPositions
}

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (Positions, error) {
positionData, err := readPositionsFile(cfg, logger)
Expand Down
80 changes: 80 additions & 0 deletions internal/component/common/loki/positions/positions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package positions

import (
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

util_log "github.com/grafana/loki/pkg/util/log"
)
Expand All @@ -37,6 +39,84 @@ func tempFilename(t *testing.T) string {
return name
}

func writeLegacy(t *testing.T, tmpDir string) string {
legacy := filepath.Join(tmpDir, "legacy")
legacyPositions := LegacyFile{
Positions: make(map[string]string),
}
// Filename and byte offset
legacyPositions.Positions["/tmp/random.log"] = "17623"
buf, err := yaml.Marshal(legacyPositions)
require.NoError(t, err)
err = os.WriteFile(legacy, buf, 0644)
require.NoError(t, err)
return legacy
}

func TestLegacyConversion(t *testing.T) {
tmpDir := t.TempDir()
legacy := writeLegacy(t, tmpDir)
positionsPath := filepath.Join(tmpDir, "positions")
ConvertLegacyPositionsFile(legacy, positionsPath, log.NewNopLogger())
ps, err := readPositionsFile(Config{
PositionsFile: positionsPath,
}, log.NewNopLogger())
require.NoError(t, err)
require.Len(t, ps, 1)
for k, v := range ps {
require.True(t, k.Path == "/tmp/random.log")
require.True(t, v == "17623")
}
// Ensure old file is deleted.
_, err = os.Stat(legacy)
require.True(t, os.IsNotExist(err))
}

func TestLegacyConversionWithNewFile(t *testing.T) {
tmpDir := t.TempDir()
legacy := writeLegacy(t, tmpDir)
// Write a new file.
positionsPath := filepath.Join(tmpDir, "positions")
err := writePositionFile(positionsPath, map[Entry]string{
{Path: "/tmp/newrandom.log", Labels: ""}: "100",
})
require.NoError(t, err)

// In this state nothing should be overwritten.
ConvertLegacyPositionsFile(legacy, positionsPath, log.NewNopLogger())
ps, err := readPositionsFile(Config{
PositionsFile: positionsPath,
}, log.NewNopLogger())
require.NoError(t, err)
require.Len(t, ps, 1)
for k, v := range ps {
require.True(t, k.Path == "/tmp/newrandom.log")
require.True(t, v == "100")
}
}

func TestLegacyConversionWithNoLegacyFile(t *testing.T) {
tmpDir := t.TempDir()
legacy := filepath.Join(tmpDir, "legacy")
positionsPath := filepath.Join(tmpDir, "positions")
// Write a new file.
err := writePositionFile(positionsPath, map[Entry]string{
{Path: "/tmp/newrandom.log", Labels: ""}: "100",
})
require.NoError(t, err)

ConvertLegacyPositionsFile(legacy, positionsPath, log.NewNopLogger())
ps, err := readPositionsFile(Config{
PositionsFile: positionsPath,
}, log.NewNopLogger())
require.NoError(t, err)
require.Len(t, ps, 1)
for k, v := range ps {
require.True(t, k.Path == "/tmp/newrandom.log")
require.True(t, v == "100")
}
}

func TestReadPositionsOK(t *testing.T) {
temp := tempFilename(t)
defer func() {
Expand Down
14 changes: 9 additions & 5 deletions internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Arguments struct {
DecompressionConfig DecompressionConfig `river:"decompression,block,optional"`
FileWatch FileWatch `river:"file_watch,block,optional"`
TailFromEnd bool `river:"tail_from_end,attr,optional"`
LegacyPositionsFile string `river:"legacy_positions_file,attr,optional"`
}

type FileWatch struct {
Expand All @@ -70,9 +71,7 @@ type DecompressionConfig struct {
Format CompressionFormat `river:"format,attr"`
}

var (
_ component.Component = (*Component)(nil)
)
var _ component.Component = (*Component)(nil)

// Component implements the loki.source.file component.
type Component struct {
Expand All @@ -95,9 +94,14 @@ func New(o component.Options, args Arguments) (*Component, error) {
if err != nil && !os.IsExist(err) {
return nil, err
}
newPositionsPath := filepath.Join(o.DataPath, "positions.yml")
// Check to see if we can convert the legacy positions file to the new format.
if args.LegacyPositionsFile != "" {
positions.ConvertLegacyPositionsFile(args.LegacyPositionsFile, newPositionsPath, o.Logger)
}
positionsFile, err := positions.New(o.Logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
PositionsFile: newPositionsPath,
IgnoreInvalidYaml: false,
ReadOnly: false,
})
Expand Down Expand Up @@ -197,7 +201,7 @@ func (c *Component) Update(args component.Arguments) error {
for _, target := range newArgs.Targets {
path := target[pathLabel]

var labels = make(model.LabelSet)
labels := make(model.LabelSet)
for k, v := range target {
if strings.HasPrefix(k, model.ReservedLabelPrefix) {
continue
Expand Down
Loading

0 comments on commit 5e037eb

Please sign in to comment.