Skip to content

Add support for HDFS federation #227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ type ClientOptions struct {

// ClientOptionsFromConf attempts to load any relevant configuration options
// from the given Hadoop configuration and create a ClientOptions struct
// suitable for creating a Client. Currently this sets the following fields
// on the resulting ClientOptions:
// suitable for creating a Client for the given nameservice. Currently this
// sets the following fields on the resulting ClientOptions:
//
// // Determined by fs.defaultFS (or the deprecated fs.default.name), or
// // fields beginning with dfs.namenode.rpc-address.
// // Determined by the value of the property named
// // "dfs.namenode.rpc-address."+ns if using a non-HA federated
// // architecture, or "dfs.namenode.rpc-address."+ns+"."+nnid for each nnid
// // found in "dfs.ha.namenodes."+ns
// Addresses []string
//
// // Determined by dfs.client.use.datanode.hostname.
Expand All @@ -95,14 +97,30 @@ type ClientOptions struct {
// actually configured, you should check for whether KerberosClient is set in
// the resulting ClientOptions before proceeding:
//
// options := ClientOptionsFromConf(conf)
// options := ClientOptionsFromConf(conf, "mynameservice")
// if options.KerberosClient != nil {
// // Replace with a valid credentialed client.
// options.KerberosClient = getKerberosClient()
// }
func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options := ClientOptions{Addresses: conf.Namenodes()}
func ClientOptionsFromConf(conf hadoopconf.HadoopConf, ns string) ClientOptions {
options := commonClientOptionsFromConf(conf)
options.Addresses = conf.Namenodes(ns)
return options
}

// DefaultClientOptionsFromConf behaves similarly to ClientOptionsFromConf
// except it uses the nameservice defined in fs.defaultFS (or the deprecated
// fs.default.name) for finding the namenode addresses. If both of these
// properties are absent, then it uses the address defined by
// dfs.namenode.rpc-address.
func DefaultClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options := commonClientOptionsFromConf(conf)
options.Addresses = conf.DefaultNamenodes()
return options
}

func commonClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options := ClientOptions{}
options.UseDatanodeHostname = (conf["dfs.client.use.datanode.hostname"] == "true")

if strings.ToLower(conf["hadoop.security.authentication"]) == "kerberos" {
Expand Down Expand Up @@ -156,7 +174,7 @@ func NewClient(options ClientOptions) (*Client, error) {
// (including the address(es) of the namenode(s), if an empty string is passed)
// will be loaded from the Hadoop configuration present at HADOOP_CONF_DIR or
// HADOOP_HOME, as specified by hadoopconf.LoadFromEnvironment and
// ClientOptionsFromConf.
// DefaultClientOptionsFromConf.
//
// Note, however, that New will not attempt any Kerberos authentication; use
// NewClient if you need that.
Expand All @@ -166,7 +184,7 @@ func New(address string) (*Client, error) {
return nil, err
}

options := ClientOptionsFromConf(conf)
options := DefaultClientOptionsFromConf(conf)
if address != "" {
options.Addresses = strings.Split(address, ",")
}
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func getClientForUser(t *testing.T, username string) *Client {
t.Fatal("Couldn't load ambient config", err)
}

options := ClientOptionsFromConf(conf)
options := DefaultClientOptionsFromConf(conf)
if options.Addresses == nil {
t.Fatal("Missing namenode addresses in ambient config")
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestNewWithMultipleNodes(t *testing.T) {
t.Fatal("Couldn't load ambient config", err)
}

nns := conf.Namenodes()
nns := conf.DefaultNamenodes()

nns = append([]string{"localhost:100"}, nns...)
_, err = NewClient(ClientOptions{Addresses: nns, User: "gohdfs1"})
Expand Down
2 changes: 1 addition & 1 deletion cmd/hdfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func getClient(namenode string) (*hdfs.Client, error) {
return nil, fmt.Errorf("Problem loading configuration: %s", err)
}

options := hdfs.ClientOptionsFromConf(conf)
options := hdfs.DefaultClientOptionsFromConf(conf)
if namenode != "" {
options.Addresses = []string{namenode}
}
Expand Down
91 changes: 60 additions & 31 deletions hadoopconf/hadoopconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,43 +90,72 @@ func Load(path string) (HadoopConf, error) {
return conf, nil
}

// Namenodes returns the namenode hosts present in the configuration. The
// returned slice will be sorted and deduped. The values are loaded from
// fs.defaultFS (or the deprecated fs.default.name), or fields beginning with
// dfs.namenode.rpc-address.
//
// To handle 'logical' clusters Namenodes will not return any cluster names
// found in dfs.ha.namenodes.<clustername> properties.
//
// If no namenode addresses can befound, Namenodes returns a nil slice.
func (conf HadoopConf) Namenodes() []string {
nns := make(map[string]bool)
var clusterNames []string

for key, value := range conf {
if strings.Contains(key, "fs.default") {
nnUrl, _ := url.Parse(value)
nns[nnUrl.Host] = true
} else if strings.HasPrefix(key, "dfs.namenode.rpc-address.") {
nns[value] = true
} else if strings.HasPrefix(key, "dfs.ha.namenodes.") {
clusterNames = append(clusterNames, key[len("dfs.ha.namenodes."):])
// DefaultNamenodes returns the namenodes that should be used given the
// configuration's fs.defaultFS (or deprecated fs.default.name) property. If no
// such property is found, i.e. if the configuration is using neither federated
// namespaces nor high availability, then the dfs.namenode.rpc-address property
// is returned if present. Otherwise, a nil slice is returned.
func (conf HadoopConf) DefaultNamenodes() []string {
if fs, ok := conf["fs.defaultFS"]; ok {
// check if default nameservice is defined
fsurl, _ := url.Parse(fs)
if fsurl == nil {
return nil
}
return conf.Namenodes(fsurl.Host)
} else if ns, ok := conf["fs.default.name"]; ok {
// check if default nameservice is defined (through deprecated name)
return conf.Namenodes(ns)
} else if nn, ok := conf["dfs.namenode.rpc-address"]; ok {
// non-HA and non-federated config; return single namenode
return []string{nn}
} else {
// no namenodes found at all
return nil
}
}

for _, cn := range clusterNames {
delete(nns, cn)
}
// namenodesPerNS returns a mapping from clusters to the namenode(s) in those
// clusters.
func (conf HadoopConf) namenodesPerNS() map[string][]string {
nns := make(map[string][]string)
var clusterNames []string

if len(nns) == 0 {
return nil
// this property is required for high availability and/or federation. if
// it's not set, the configuration must be using a non-federated and non-HA
// architecture. check if the property is defined before updating
// clusterNames because strings.Split will return a non-empty slice given
// an empty string, covering up the distinction between no dfs.nameservices
// given and an empty dfs.nameservices.
if nameservices, ok := conf["dfs.nameservices"]; ok {
clusterNames = append(clusterNames, strings.Split(nameservices, ",")...)
}

keys := make([]string, 0, len(nns))
for k, _ := range nns {
keys = append(keys, k)
// obtain logical namenode ids per nameservice
for _, ns := range clusterNames {
nnids, ha := conf["dfs.ha.namenodes."+ns]
if !ha {
// non-HA federated architecture
if nn, ok := conf["dfs.namenode.rpc-address."+ns]; ok {
nns[ns] = append(nns[ns], nn)
}
} else {
// HA architecture
for _, nnid := range strings.Split(nnids, ",") {
if nn, ok := conf["dfs.namenode.rpc-address."+ns+"."+nnid]; ok {
nns[ns] = append(nns[ns], nn)
}
}
sort.Strings(nns[ns])
}
}

sort.Strings(keys)
return keys
return nns
}

// Namenodes returns the namenode hosts present in the configuration for the
// given nameservice. The returned slice will be sorted. If no namenode
// addresses can be found, Namenodes returns a nil slice.
func (conf HadoopConf) Namenodes(ns string) []string {
return conf.namenodesPerNS()[ns]
}
4 changes: 2 additions & 2 deletions hadoopconf/hadoopconf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestConfFallback(t *testing.T) {
conf, err := LoadFromEnvironment()
assert.NoError(t, err)

nns := conf.Namenodes()
nns := conf.DefaultNamenodes()
assert.NoError(t, err)
assert.EqualValues(t, conf2Namenodes, nns, "loading via HADOOP_CONF_DIR (testdata/conf2)")

Expand All @@ -28,7 +28,7 @@ func TestConfFallback(t *testing.T) {
conf, err = LoadFromEnvironment()
assert.NoError(t, err)

nns = conf.Namenodes()
nns = conf.DefaultNamenodes()
assert.NoError(t, err)
assert.EqualValues(t, confNamenodes, nns, "loading via HADOOP_HOME (testdata/conf)")

Expand Down