Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated rootpath validation method in zookeeper coordinator #422

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/internal/helpers/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ValidateURL(rawURL string) bool {
// ValidateHostList returns true if the provided slice of strings can all be parsed by ValidateHostPort
func ValidateHostList(hosts []string) bool {
for _, host := range hosts {
if !ValidateHostPort(host, false) {
if !ValidateHostPort(host, true) {
return false
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/internal/helpers/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ func TestValidateTopic(t *testing.T) {
}
}

func TestValidateFilename(t *testing.T) {
for i, testSet := range testTopics {
result := ValidateFilename(testSet.TestValue)
assert.Equalf(t, testSet.Result, result, "Test %v - Expected '%v' to return %v, not %v", i, testSet.TestValue, testSet.Result, result)
}
}

var testEmails = []TestSet{
{"[email protected]", true},
{"need@domain", false},
Expand Down Expand Up @@ -173,10 +180,13 @@ var testHostPorts = []TestSet{
{"host.example.com:23", true},
{"thissegmentiswaytoolongbecauseitshouldnotbemorethansixtythreecharacters.foo.com:36334", false},
{"underscores_are.not.valid.com:3453", false},
{":2453", true},
{"hostname:stringsNotValid", false},
}

func TestValidateHostList(t *testing.T) {
for i, testSet := range testHostPorts {
// Test allow blank hostname
result := ValidateHostList([]string{testSet.TestValue})
assert.Equalf(t, testSet.Result, result, "Test %v - Expected '%v' to return %v, not %v", i, testSet.TestValue, testSet.Result, result)
}
Expand Down
11 changes: 11 additions & 0 deletions core/internal/helpers/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.E
return z.client.GetW(path)
}

// Exists returns a boolean stating whether or not the specified path exists.
func (z *BurrowZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
return z.client.Exists(path)
}

// ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node
// (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a
// message when the watch fires
Expand Down Expand Up @@ -115,6 +120,12 @@ func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Eve
return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3)
}

// Exists mocks protocol.ZookeeperClient.Exists
func (m *MockZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
args := m.Called(path)
return args.Bool(0), args.Get(1).(*zk.Stat), args.Error(2)
}

// ExistsW mocks protocol.ZookeeperClient.ExistsW
func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) {
args := m.Called(path)
Expand Down
7 changes: 6 additions & 1 deletion core/internal/zookeeper/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ func (zc *Coordinator) createRecursive(path string) error {

parts := strings.Split(path, "/")
for i := 2; i <= len(parts); i++ {
// If the rootpath exists, skip the Create process to avoid "zk: not authenticated" error
exist, _, errExists := zc.App.Zookeeper.Exists(strings.Join(parts[:i], "/"))
if (!exist) && (errExists == nil) {
continue
}
_, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll))
// Ignore when the node exists already
// Ignore when the node exists git already
if (err != nil) && (err != zk.ErrNodeExists) {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions core/internal/zookeeper/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ func TestCoordinator_StartStop(t *testing.T) {
return &mockClient, eventChan, nil
}

offsetStat := &zk.Stat{}
mockClient.On("Exists", "/test").Return(true, offsetStat, nil)
mockClient.On("Create", "/test", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists)
mockClient.On("Exists", "/test/path").Return(true, offsetStat, nil)
mockClient.On("Create", "/test/path", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists)
mockClient.On("Exists", "/test/path/burrow").Return(false, offsetStat, nil)
mockClient.On("Create", "/test/path/burrow", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", nil)
mockClient.On("Close").Run(func(args mock.Arguments) { close(eventChan) }).Return()

Expand Down
6 changes: 5 additions & 1 deletion core/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,17 @@ type ZookeeperClient interface {
// the children of the specified path, providing an event channel that will receive a message when the watch fires
GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists.
// The method does not set watch on the node, but verifies existence of a node to avoid authentication error.
Exists(path string) (bool, *zk.Stat, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets
// a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event
// channel that will receive a message when the watch fires
ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

// Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is\
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is
// desired, specify
// zk.WorldACL(zk.PermAll)
Create(string, []byte, int32, []zk.ACL) (string, error)
Expand Down