diff --git a/Research/tensorflow-on-kubeflow/Readme.md b/Research/tensorflow-on-kubeflow/Readme.md new file mode 100644 index 000000000..e3d7daecf --- /dev/null +++ b/Research/tensorflow-on-kubeflow/Readme.md @@ -0,0 +1,353 @@ +# Tensorflow on Kubeflow on Azure Stack + +This module demonstrates how to run TensorFlow jobs on Kubeflow cluster on Azure Stack. + +[TensorFlow](https://www.tensorflow.org/) is a popular open source machine learning framework. +It was initially developed by the Google Brain team for internal Google use, and later released under +the Apache License 2.0. + +# Prerequisites + +Previous familiarity with the following is recommended: + +- [Kubernetes](https://kubernetes.io/) +- [Docker](https://www.docker.com/) +- [TensorFlow](https://tensorflow.org/) +- [Azure](http://azure.com) +- [Kubeflow](https://github.com/kubeflow/kubeflow) + +For obvious reasons, distributed training of the models is easier to see if the cluster has more than one node in its pool, and, respectively, at least that amount of the replicas for the worker conterner instances. + +# Installation + +Please see the `Kubeflow on Azure Stack` module of this repository, or [https://www.kubeflow.org](https://www.kubeflow.org) for details of using Kubeflow and installing it on Azure or Azure Stack. + +`TFJob` is a custom workload, and you should see it among the registered custom resources: + + azureuser@k8s-master-36519982-0:~$ kubectl get crd | grep tfjob + tfjobs.kubeflow.org 2020-05-06T01:30:30Z + +It is also recommended, but not necessary, to have Kubernetes Dashboard running on the cluster. + + +# Building Docker image for distributed mnist model for e2e test + +We will run a popular scenario from Kubeflow's repository, +[K8s Custom Resource and Operator For TensorFlow jobs](https://github.com/kubeflow/tf-operator/) + +You do not have to re-build the image, you can use `kubeflow/tf-dist-mnist-test:1.0`. If you do decide to use your own image, here is how you could build it: + + $ cd tensorflow-on-kubeflow/dist-mnist-e2e-test + +Login to your Docker account: + + $ docker login + ... enter the credentials if needed ... + +Build the image: + + $ docker build -f Dockerfile -t rollingstones/tf-dist-mnist-test:1.0 ./ + Sending build context to Docker daemon 18.94kB + Step 1/3 : FROM tensorflow/tensorflow:1.5.0 + 1.5.0: Pulling from tensorflow/tensorflow + 1be7f2b886e8: Pull complete + ... + 7cec7dc064fc: Pull complete + Digest: sha256:273cd3c514feb7f93efa3c3e03361969dd3276dbe8267482eb67f5921fb66c0b + Status: Downloaded newer image for tensorflow/tensorflow:1.5.0 + ---> a2d1671e8a93 + Step 2/3 : ADD . /var/tf_dist_mnist + ---> 0cb4d841948b + Step 3/3 : ENTRYPOINT ["python", "/var/tf_dist_mnist/dist_mnist.py"] + ---> Running in 7defb9c160d7 + Removing intermediate container 7defb9c160d7 + ---> b9fc305fb63a + Successfully built b9fc305fb63a + Successfully tagged rollingstones/tf-dist-mnist-test:1.0 + +And to push to DockerHub or another container registry or artifactory: + + $ docker push rollingstones/tf-dist-mnist-test:1.0 + The push refers to repository [docker.io/rollingstones/tf-dist-mnist-test] + ce40b6a5f992: Pushed + c04a36d9e118: Mounted from tensorflow/tensorflow + ... + sha256:af441000275fe99aa463d36a814c1b3b38a7d5de45a131f38d97971119730a6a size: 3038 + +# Running a TFJob + +Create a tf_job_mnist-e2e-test.yaml file: + + apiVersion: "kubeflow.org/v1" + kind: "TFJob" + metadata: + name: "dist-mnist-for-e2e-test-demo" + spec: + tfReplicaSpecs: + PS: + replicas: 1 + restartPolicy: OnFailure + template: + spec: + containers: + - name: tensorflow + image: rollingstones/tf-dist-mnist-test:1.0 + Worker: + replicas: 3 + restartPolicy: OnFailure + template: + spec: + containers: + - name: tensorflow + image: rollingstones/tf-dist-mnist-test:1.0 + +To run a TFJob: + + $ kubectl create -f tf_job_mnist-e2e-test.yaml + +You should see the pods being initialized: + + $ kubectl get pods + NAME READY STATUS RESTARTS AGE + ... + dist-mnist-for-e2e-test-demo-ps-0 0/1 ContainerCreating 0 23s + dist-mnist-for-e2e-test-demo-worker-0 0/1 ContainerCreating 0 23s + dist-mnist-for-e2e-test-demo-worker-1 0/1 ContainerCreating 0 23s + dist-mnist-for-e2e-test-demo-worker-2 0/1 ContainerCreating 0 23s + +Then pods will be running, and, finally, getting to status `Completed`: + + $ kubeclt get pods + NAME READY STATUS RESTARTS AGE + ... + dist-mnist-for-e2e-test-demo-worker-0 0/1 Completed 0 9m21s + dist-mnist-for-e2e-test-demo-worker-1 0/1 Completed 0 9m21s + dist-mnist-for-e2e-test-demo-worker-2 0/1 Completed 0 9m21s + +Here is an example of the log on a worker node: + + $ kubectl logs dist-mnist-for-e2e-test-demo-worker-0 + /usr/local/lib/python2.7/dist-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`. + from ._conv import register_converters as _register_converters + 2020-05-13 19:30:28.384657: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA + 2020-05-13 19:30:28.385503: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> dist-mnist-for-e2e-test-my-ps-0.default.svc:2222} + 2020-05-13 19:30:28.385534: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2222, 1 -> dist-mnist-for-e2e-test-my-worker-1.default.svc:2222, 2 -> dist-mnist-for-e2e-test-my-worker-2.default.svc:2222} + 2020-05-13 19:30:28.386049: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:2222 + WARNING:tensorflow:From /var/tf_dist_mnist/dist_mnist.py:239: __init__ (from tensorflow.python.training.supervisor) is deprecated and will be removed in a future version. + Instructions for updating: + Please switch to tf.train.MonitoredTrainingSession + 2020-05-13 19:30:37.526195: I tensorflow/core/distributed_runtime/master_session.cc:1017] Start master session 1ddfe25446c51488 with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true + Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes. + Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz + Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes. + Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz + Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes. + Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz + Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes. + Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz + job name = worker + task index = 0 + Worker 0: Initializing session... + Worker 0: Session initialization complete. + Training begins @ 1589398237.717394 + 1589398237.837679: Worker 0: training step 1 done (global step: 0) + 1589398237.849407: Worker 0: training step 2 done (global step: 1) + 1589398237.860992: Worker 0: training step 3 done (global step: 2) + ... + 1589398297.489717: Worker 0: training step 5745 done (global step: 19996) + 1589398297.505792: Worker 0: training step 5746 done (global step: 19997) + 1589398297.517557: Worker 0: training step 5747 done (global step: 20000) + Training ends @ 1589398297.517635 + Training elapsed time: 59.800241 s + After 20000 training step(s), validation cross entropy = 2882.02 + + +What is worth pointing out is that the nodes are being assigned individual indices: + + 2020-05-13 19:30:28.385503: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> dist-mnist-for-e2e-test-my-ps-0.default.svc:2222} + 2020-05-13 19:30:28.385534: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2222, 1 -> dist-mnist-for-e2e-test-my-worker-1.default.svc:2222, 2 -> dist-mnist-for-e2e-test-my-worker-2.default.svc:2222} + 2020-05-13 19:30:28.386049: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:2222 + +# Persistence for models and data + +If you would like to save the results of model training, you can do so from you scripts +using the Kubernetes volumes you mount. However, on Azure Stack you do not have `azurefile` +available yet, but there are many other options, e.g. you can use a network storage. + +You can follow the [Installing Network Storage Server](installing_network_storage.md). But +usually it is better to ask your Azure Stack administrator to create a Samba server for you. + +## Creating smb clients + +On the client side, if you have to do it yourself, install a Samba client: + + $ sudo apt install -y smbclient cifs-utils + +Create a folder for mounting: + + $ sudo mkdir /mnt/shares + $ sudo chown azureuser:azureuser /mnt/shares + +Put your share drive information to `/etc/samba`: + + $ sudo vi /etc/samba/.sambacreds + $ cat /etc/samba/.sambacreds + username=sambauser1 + password= + domain=WORKGROUP + +Define the mount in your `fstab` file, pointing to your .sbmabcreds file and the mounting point we created: + + $ sudo vi /etc/fstab + $ cat /etc/fstab + ... + //12.34.259.89/sambauser1 /mnt/shares cifs rw,uid=azureuser,guest,noperm,credentials=/etc/samba/.sambacreds 0 0 + ... + + $ sudo mount /mnt/shares + +Verify the mounting, you should see your server's ip and Samba user: + + $ sudo mount + ... + //12.34.259.89/sambauser1 on /mnt/shares type cifs (rw,relatime,vers=default,cache=strict,username=sambauser1,domain=WORKGROUP,uid=1000,forceuid,gid=0,noforcegid,addr=12.34.259.89,file_mode=0755,dir_mode=0755,soft,nounix,serverino,mapposix,noperm,rsize=1048576,wsize=1048576,echo_interval=60,actimeo=1) + ... + +Try the following from two different nodes of your cluster. On one: + + $ echo "from machine a" > /mnt/shares/from_machine_a.txt + +On the other: + + $ ls /mnt/shares/ + from_machine_a.txt + $ cat /mnt/shares/from_machine_a.txt + from machine a + +You would need to repeat the same installation process on all Kubernetes nodes, because +the pods could be instantiated anywhere and will try to access the local storage there. + +## Creating storage class, pv, and pvc + +Create a .yaml with sc/pv/pvc definitions pointing to the created shared folder: + +``` +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: local-storage +provisioner: kubernetes.io/no-provisioner +#reclaimPolicy: Retain +#volumeBindingMode: WaitForFirstConsumer +--- +kind: PersistentVolume +apiVersion: v1 +metadata: + name: samba-share-volume + labels: + type: local + app: tfjob +spec: + storageClassName: local-storage + capacity: + storage: 2Gi + accessModes: + - ReadWriteMany + hostPath: + path: "/mnt/shares/kfbuffer" +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: samba-share-claim +spec: + storageClassName: local-storage + accessModes: + - ReadWriteMany + resources: + requests: + storage: 1Gi +``` + +Apply it: + + $ kubectl create -f persistence.yaml + storageclass.storage.k8s.io/local-storage created + persistentvolume/samba-share-volume created + persistentvolumeclaim/samba-share-claim created + + $ kubectl get pvc + NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE + samba-share-claim Bound samba-share-volume 20Gi RWX local-storage 2m24s + + $ kubectl get pv + NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE + ... + samba-share-volume 20Gi RWX Retain Bound default/samba-share-claim local-storage 2m41s + ... + +You should see the pv being `Bound`, and it is available for your applications. + + $ kubectl describe pvc samba-share-claim + Name: samba-share-claim + Namespace: default + StorageClass: local-storage + Status: Bound + Volume: samba-share-volume + Labels: + Annotations: pv.kubernetes.io/bind-completed: yes + pv.kubernetes.io/bound-by-controller: yes + Finalizers: [kubernetes.io/pvc-protection] + Capacity: 20Gi + Access Modes: RWX + VolumeMode: Filesystem + Mounted By: dist-mnist-for-e2e-test-demo-ps-0 + dist-mnist-for-e2e-test-demo-worker-0 + dist-mnist-for-e2e-test-demo-worker-1 + dist-mnist-for-e2e-test-demo-worker-2 + Events: + +And the volume itself marked as `HostPath`: + + $ kubectl describe pv samba-share-volume + Name: samba-share-volume + Labels: type=local + Annotations: pv.kubernetes.io/bound-by-controller: yes + Finalizers: [kubernetes.io/pv-protection] + StorageClass: local-storage + Status: Bound + Claim: default/samba-share-claim + Reclaim Policy: Retain + Access Modes: RWX + VolumeMode: Filesystem + Capacity: 20Gi + Node Affinity: + Message: + Source: + Type: HostPath (bare host directory volume) + Path: /mnt/shares/kfbuffer + HostPathType: + Events: + + +Now, from your script in the container you can write to that folder your serialized models during the intermediate +steps. It is better to let the master node (with rank 0) to do the logging and serialization. And the master node +should do the deserialization if needed. + +For our example, we save the `checkpoint`, model metadata and other information at our shared volume: + + $ ls /mnt/shares/kfbuffer/ + checkpoint model.ckpt-0.meta + events.out.tfevents.1592351909.dist-mnist-for-e2e-test-demo2-worker-0 t10k-images-idx3-ubyte.gz + graph.pbtxt t10k-labels-idx1-ubyte.gz + model.ckpt-0.data-00000-of-00001 train-images-idx3-ubyte.gz + model.ckpt-0.index train-labels-idx1-ubyte.gz + +See [save_and_load.ipynb](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/distribute/save_and_load.ipynb) example notebook. + +# Links + +For further information: + +- https://www.kubeflow.org/docs/components/training/tftraining/ +- https://www.tensorflow.org/ diff --git a/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/Dockerfile b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/Dockerfile new file mode 100644 index 000000000..0fdba7e53 --- /dev/null +++ b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/Dockerfile @@ -0,0 +1,18 @@ +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM tensorflow/tensorflow:1.5.0 +RUN mkdir /tmp/mnist-data +ADD . /var/tf_dist_mnist +ENTRYPOINT ["python", "/var/tf_dist_mnist/dist_mnist.py"] diff --git a/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/dist_mnist.py b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/dist_mnist.py new file mode 100644 index 000000000..f3b1763ca --- /dev/null +++ b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/dist_mnist.py @@ -0,0 +1,303 @@ +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Distributed MNIST training and validation, with model replicas. +A simple softmax model with one hidden layer is defined. The parameters +(weights and biases) are located on one parameter server (ps), while the ops +are executed on two worker nodes by default. The TF sessions also run on the +worker node. +Multiple invocations of this script can be done in parallel, with different +values for --task_index. There should be exactly one invocation with +--task_index, which will create a master session that carries out variable +initialization. The other, non-master, sessions will wait for the master +session to finish the initialization before proceeding to the training stage. +The coordination between the multiple worker invocations occurs due to +the definition of the parameters on the same ps devices. The parameter updates +from one worker is visible to all other workers. As such, the workers can +perform forward computation and gradient calculation in parallel, which +should lead to increased training speed for the simple model. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import json +import math +import os +import sys +import tempfile +import time + +import tensorflow as tf +from tensorflow.examples.tutorials.mnist import input_data + +flags = tf.app.flags +flags.DEFINE_string("data_dir", "/tmp/mnist-data", + "Directory for storing mnist data") +flags.DEFINE_boolean("download_only", False, + "Only perform downloading of data; Do not proceed to " + "session preparation, model definition or training") +flags.DEFINE_integer("task_index", None, + "Worker task index, should be >= 0. task_index=0 is " + "the master worker task the performs the variable " + "initialization ") +flags.DEFINE_integer("num_gpus", 1, "Total number of gpus for each machine." + "If you don't use GPU, please set it to '0'") +flags.DEFINE_integer("replicas_to_aggregate", None, + "Number of replicas to aggregate before parameter update" + "is applied (For sync_replicas mode only; default: " + "num_workers)") +flags.DEFINE_integer("hidden_units", 100, + "Number of units in the hidden layer of the NN") +flags.DEFINE_integer("train_steps", 20000, + "Number of (global) training steps to perform") +flags.DEFINE_integer("batch_size", 100, "Training batch size") +flags.DEFINE_float("learning_rate", 0.01, "Learning rate") +flags.DEFINE_boolean( + "sync_replicas", False, + "Use the sync_replicas (synchronized replicas) mode, " + "wherein the parameter updates from workers are aggregated " + "before applied to avoid stale gradients") +flags.DEFINE_boolean( + "existing_servers", False, "Whether servers already exists. If True, " + "will use the worker hosts via their GRPC URLs (one client process " + "per worker host). Otherwise, will create an in-process TensorFlow " + "server.") +flags.DEFINE_string("ps_hosts", "localhost:2222", + "Comma-separated list of hostname:port pairs") +flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", + "Comma-separated list of hostname:port pairs") +flags.DEFINE_string("job_name", None, "job name: worker or ps") + +FLAGS = flags.FLAGS + +IMAGE_PIXELS = 28 + +# Example: +# cluster = {'ps': ['host1:2222', 'host2:2222'], +# 'worker': ['host3:2222', 'host4:2222', 'host5:2222']} +# os.environ['TF_CONFIG'] = json.dumps( +# {'cluster': cluster, +# 'task': {'type': 'worker', 'index': 1}}) + +def main(unused_argv): + # Parse environment variable TF_CONFIG to get job_name and task_index + + # If not explicitly specified in the constructor and the TF_CONFIG + # environment variable is present, load cluster_spec from TF_CONFIG. + tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}') + task_config = tf_config.get('task', {}) + task_type = task_config.get('type') + task_index = task_config.get('index') + + FLAGS.job_name = task_type + FLAGS.task_index = task_index + + mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True) + if FLAGS.download_only: + sys.exit(0) + + if FLAGS.job_name is None or FLAGS.job_name == "": + raise ValueError("Must specify an explicit `job_name`") + if FLAGS.task_index is None or FLAGS.task_index == "": + raise ValueError("Must specify an explicit `task_index`") + + print("job name = %s" % FLAGS.job_name) + print("task index = %d" % FLAGS.task_index) + + cluster_config = tf_config.get('cluster', {}) + ps_hosts = cluster_config.get('ps') + worker_hosts = cluster_config.get('worker') + + ps_hosts_str = ','.join(ps_hosts) + worker_hosts_str = ','.join(worker_hosts) + + FLAGS.ps_hosts = ps_hosts_str + FLAGS.worker_hosts = worker_hosts_str + + # Construct the cluster and start the server + ps_spec = FLAGS.ps_hosts.split(",") + worker_spec = FLAGS.worker_hosts.split(",") + + # Get the number of workers. + num_workers = len(worker_spec) + + cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec}) + + if not FLAGS.existing_servers: + # Not using existing servers. Create an in-process server. + server = tf.train.Server( + cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) + if FLAGS.job_name == "ps": + server.join() + + is_chief = (FLAGS.task_index == 0) + if FLAGS.num_gpus > 0: + # Avoid gpu allocation conflict: now allocate task_num -> #gpu + # for each worker in the corresponding machine + gpu = (FLAGS.task_index % FLAGS.num_gpus) + worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu) + elif FLAGS.num_gpus == 0: + # Just allocate the CPU to worker server + cpu = 0 + worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu) + # The device setter will automatically place Variables ops on separate + # parameter servers (ps). The non-Variable ops will be placed on the workers. + # The ps use CPU and workers use corresponding GPU + with tf.device( + tf.train.replica_device_setter( + worker_device=worker_device, + ps_device="/job:ps/cpu:0", + cluster=cluster)): + global_step = tf.Variable(0, name="global_step", trainable=False) + + # Variables of the hidden layer + hid_w = tf.Variable( + tf.truncated_normal( + [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], + stddev=1.0 / IMAGE_PIXELS), + name="hid_w") + hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b") + + # Variables of the softmax layer + sm_w = tf.Variable( + tf.truncated_normal( + [FLAGS.hidden_units, 10], + stddev=1.0 / math.sqrt(FLAGS.hidden_units)), + name="sm_w") + sm_b = tf.Variable(tf.zeros([10]), name="sm_b") + + # Ops: located on the worker specified with FLAGS.task_index + x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS]) + y_ = tf.placeholder(tf.float32, [None, 10]) + + hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) + hid = tf.nn.relu(hid_lin) + + y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b)) + cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0))) + + opt = tf.train.AdamOptimizer(FLAGS.learning_rate) + + if FLAGS.sync_replicas: + if FLAGS.replicas_to_aggregate is None: + replicas_to_aggregate = num_workers + else: + replicas_to_aggregate = FLAGS.replicas_to_aggregate + + opt = tf.train.SyncReplicasOptimizer( + opt, + replicas_to_aggregate=replicas_to_aggregate, + total_num_replicas=num_workers, + name="mnist_sync_replicas") + + train_step = opt.minimize(cross_entropy, global_step=global_step) + + if FLAGS.sync_replicas: + local_init_op = opt.local_step_init_op + if is_chief: + local_init_op = opt.chief_init_op + + ready_for_local_init_op = opt.ready_for_local_init_op + + # Initial token and chief queue runners required by the sync_replicas mode + chief_queue_runner = opt.get_chief_queue_runner() + sync_init_op = opt.get_init_tokens_op() + + init_op = tf.global_variables_initializer() + #train_dir = tempfile.mkdtemp() + train_dir = "/tmp/mnist-data" + + if FLAGS.sync_replicas: + sv = tf.train.Supervisor( + is_chief=is_chief, + logdir=train_dir, + init_op=init_op, + local_init_op=local_init_op, + ready_for_local_init_op=ready_for_local_init_op, + recovery_wait_secs=1, + global_step=global_step) + else: + sv = tf.train.Supervisor( + is_chief=is_chief, + logdir=train_dir, + init_op=init_op, + recovery_wait_secs=1, + global_step=global_step) + + sess_config = tf.ConfigProto( + allow_soft_placement=True, + log_device_placement=False, + device_filters=["/job:ps", + "/job:worker/task:%d" % FLAGS.task_index]) + + # The chief worker (task_index==0) session will prepare the session, + # while the remaining workers will wait for the preparation to complete. + if is_chief: + print("Worker %d: Initializing session..." % FLAGS.task_index) + else: + print("Worker %d: Waiting for session to be initialized..." % + FLAGS.task_index) + + if FLAGS.existing_servers: + server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index] + print("Using existing server at: %s" % server_grpc_url) + + sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config) + else: + sess = sv.prepare_or_wait_for_session(server.target, config=sess_config) + + print("Worker %d: Session initialization complete." % FLAGS.task_index) + + if FLAGS.sync_replicas and is_chief: + # Chief worker will start the chief queue runner and call the init op. + sess.run(sync_init_op) + sv.start_queue_runners(sess, [chief_queue_runner]) + + # Perform training + time_begin = time.time() + print("Training begins @ %f" % time_begin) + + local_step = 0 + while True: + # Training feed + batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size) + train_feed = {x: batch_xs, y_: batch_ys} + + _, step = sess.run([train_step, global_step], feed_dict=train_feed) + local_step += 1 + + now = time.time() + print("%f: Worker %d: training step %d done (global step: %d)" % + (now, FLAGS.task_index, local_step, step)) + + if step >= FLAGS.train_steps: + break + + time_end = time.time() + print("Training ends @ %f" % time_end) + training_time = time_end - time_begin + print("Training elapsed time: %f s" % training_time) + + # Validation feed + val_feed = {x: mnist.validation.images, y_: mnist.validation.labels} + val_xent = sess.run(cross_entropy, feed_dict=val_feed) + print("After %d training step(s), validation cross entropy = %g" % + (FLAGS.train_steps, val_xent)) + + +if __name__ == "__main__": + tf.app.run() + \ No newline at end of file diff --git a/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/persistence.yaml b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/persistence.yaml new file mode 100644 index 000000000..cea633336 --- /dev/null +++ b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/persistence.yaml @@ -0,0 +1,40 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: local-storage +provisioner: kubernetes.io/no-provisioner +#reclaimPolicy: Retain +#volumeBindingMode: WaitForFirstConsumer + +--- + +kind: PersistentVolume +apiVersion: v1 +metadata: + name: samba-share-volume + labels: + type: local +spec: + storageClassName: local-storage + capacity: + storage: 2Gi + accessModes: + - ReadWriteMany + hostPath: + path: "/mnt/shares/kfbuffer" + +--- + +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: samba-share-claim +spec: + storageClassName: local-storage + accessModes: + - ReadWriteMany + resources: + requests: + storage: 1Gi + + \ No newline at end of file diff --git a/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/tf_job_mnist-e2e-test.yaml b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/tf_job_mnist-e2e-test.yaml new file mode 100644 index 000000000..5e9503375 --- /dev/null +++ b/Research/tensorflow-on-kubeflow/dist-mnist-e2e-test/tf_job_mnist-e2e-test.yaml @@ -0,0 +1,36 @@ +apiVersion: "kubeflow.org/v1" +kind: "TFJob" +metadata: + name: "dist-mnist-for-e2e-test-demo" +spec: + tfReplicaSpecs: + PS: + replicas: 1 + restartPolicy: OnFailure + template: + spec: + containers: + - name: tensorflow + image: kubeflow/tf-dist-mnist-test:1.0 + volumeMounts: + - mountPath: "/tmp/mnist-data" + name: samba-share-volume2 + volumes: + - name: samba-share-volume2 + persistentVolumeClaim: + claimName: samba-share-claim + Worker: + replicas: 3 + restartPolicy: OnFailure + template: + spec: + containers: + - name: tensorflow + image: kubeflow/tf-dist-mnist-test:1.0 + volumeMounts: + - mountPath: "/tmp/mnist-data" + name: samba-share-volume2 + volumes: + - name: samba-share-volume2 + persistentVolumeClaim: + claimName: samba-share-claim diff --git a/Research/tensorflow-on-kubeflow/installing_network_storage.md b/Research/tensorflow-on-kubeflow/installing_network_storage.md new file mode 100644 index 000000000..ad7a43e36 --- /dev/null +++ b/Research/tensorflow-on-kubeflow/installing_network_storage.md @@ -0,0 +1,119 @@ +# Installing Network Storage + +It is one of many ways to create a network-accessible storage, we will create a Samba server. + +It is better for security and performance to create the storage server within +the same Azure Stack cluster where you deployed your Kubernetes/Kubeflow cluster. +Using your Azure Stack portal, create a virtual machine. On that machine: + + + $ sudo apt update + $ sudo apt -y install samba + $ sudo mkdir /home/share + $ sudo chmod 777 /home/share + $ sudo vi /etc/samba/smb.conf + +Add to the file the entries you would like to use, for example: + + ... + [sambashare] + comment = Samba on Ubuntu + path = /home/azureuser/sambashare + read only = no + browsable = yes + + [sambauser1] + path = /home/share/sambauser1 + read only = no + browseable = no + force create mode = 0660 + force directory mode = 2770 + valid users = @sambauser1 @sambashare + + [smbadmin] + path = /home/share/smbadmin + read only = no + browseable = yes + force create mode = 0660 + force directory mode = 2770 + valid users = @sambashare @smbadmin + +Create the users and update the folder ownership: + + $ sudo chgrp sambashare /home/share + $ sudo useradd -M -d /home/share/sambauser1 -s /usr/sbin/nologin -G sambashare sambauser1 + $ sudo mkdir /home/share/sambauser1 + $ sudo chown sambauser1:sambashare /home/share/sambauser1 + $ sudo chmod 2770 /home/share/sambauser1 + +And created Samba users: + + $ sudo smbpasswd -a sambauser1 + New SMB password: + Retype new SMB password: + Added user sambauser1. + +And enable this user: + + $ sudo smbpasswd -e sambauser1 + Enabled user sambauser1. + +Create an admin user: + + $ sudo useradd -M -d /home/share/smbadmin -s /usr/sbin/nologin -G sambashare smbadmin + $ sudo mkdir /home/share/smbadmin + $ sudo smbpasswd -a smbadmin + New SMB password: + Retype new SMB password: + Added user smbadmin. + $ sudo smbpasswd -e smbadmin + Enabled user smbadmin. + $ sudo chown smbadmin:sambashare /home/share/smbadmin + $ sudo chmod 2770 /home/share/smbadmin + + $ sudo systemctl restart smbd nmbd + +To check the status: + + $ systemctl status smbd + ● smbd.service - Samba SMB Daemon + Loaded: loaded (/lib/systemd/system/smbd.service; enabled; vendor preset: enabled) + Active: active (running) since Fri 2020-05-08 01:11:37 UTC; 19s ago + Docs: man:smbd(8) + man:samba(7) + man:smb.conf(5) + Main PID: 19151 (smbd) + Status: "smbd: ready to serve connections..." + Tasks: 4 (limit: 8303) + CGroup: /system.slice/smbd.service + ├─19151 /usr/sbin/smbd --foreground --no-process-group + ├─19165 /usr/sbin/smbd --foreground --no-process-group + ├─19166 /usr/sbin/smbd --foreground --no-process-group + └─19168 /usr/sbin/smbd --foreground --no-process-group + + May 08 01:11:37 sambadata systemd[1]: Starting Samba SMB Daemon... + May 08 01:11:37 sambadata systemd[1]: Started Samba SMB Daemon. + +Update your firewall rules to let the smb traffic through: + + $ sudo ufw allow 'Samba' + Rules updated + Rules updated (v6) + +And verify that the ports are listening: + + $ sudo netstat -tulpn | egrep "samba|smbd|nmbd|winbind" + tcp 0 0 0.0.0.0:139 0.0.0.0:* LISTEN 19151/smbd + tcp 0 0 0.0.0.0:445 0.0.0.0:* LISTEN 19151/smbd + tcp6 0 0 :::139 :::* LISTEN 19151/smbd + tcp6 0 0 :::445 :::* LISTEN 19151/smbd + udp 0 0 172.16.0.255:137 0.0.0.0:* 19126/nmbd + udp 0 0 172.16.0.4:137 0.0.0.0:* 19126/nmbd + udp 0 0 0.0.0.0:137 0.0.0.0:* 19126/nmbd + udp 0 0 172.16.0.255:138 0.0.0.0:* 19126/nmbd + udp 0 0 172.16.0.4:138 0.0.0.0:* 19126/nmbd + udp 0 0 0.0.0.0:138 0.0.0.0:* 19126/nmbd + +You are ready to get the samba clients connect to your server. + +[Back](Readme.md) \ No newline at end of file