Skip to content

Commit

Permalink
Merge pull request #8 from 30x/XAPID-1070
Browse files Browse the repository at this point in the history
Xapid 1070 add JWT token to download requests from apid to blobserver
  • Loading branch information
haomingzhang authored Aug 11, 2017
2 parents 6cad27e + 5efac39 commit 604b37b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 29 deletions.
30 changes: 16 additions & 14 deletions bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ type bundleManager struct {
apiMan apiManagerInterface
concurrentDownloads int
markDeploymentFailedAfter time.Duration
bundleDownloadConnTimeout time.Duration
bundleRetryDelay time.Duration
bundleCleanupDelay time.Duration
downloadQueue chan *DownloadRequest
isClosed *int32
workers []*BundleDownloader
client *http.Client
}

type blobServerResponse struct {
Expand Down Expand Up @@ -103,7 +103,7 @@ func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest {
blobId: id,
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
connTimeout: bm.bundleDownloadConnTimeout,
client: bm.client,
}
}

Expand Down Expand Up @@ -158,8 +158,8 @@ type DownloadRequest struct {
blobId string
backoffFunc func()
markFailedAt time.Time
connTimeout time.Duration
blobServerURL string
client *http.Client
}

func (r *DownloadRequest) downloadBundle() error {
Expand All @@ -172,7 +172,7 @@ func (r *DownloadRequest) downloadBundle() error {
}
}

downloadedFile, err := downloadFromURI(r.blobServerURL, r.blobId, r.connTimeout)
downloadedFile, err := downloadFromURI(r.client, r.blobServerURL, r.blobId)

if err != nil {
log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func getBlobFilePath(blobId string) string {
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId)))
}

func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
func getSignedURL(client *http.Client, blobServerURL string, blobId string) (string, error) {

blobUri, err := url.Parse(blobServerURL)
if err != nil {
Expand All @@ -224,7 +224,7 @@ func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout

uri := blobUri.String()

surl, err := getURIReader(uri, bundleDownloadConnTimeout)
surl, err := getUriReaderWithAuth(client, uri)
if err != nil {
log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err)
return "", err
Expand All @@ -248,12 +248,12 @@ func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout

// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
// after downloading the resource from GCS (via the signed URL)
func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) {
func downloadFromURI(client *http.Client, blobServerURL string, blobId string) (tempFileName string, err error) {

var tempFile *os.File
log.Debugf("Downloading bundle: %s", blobId)

uri, err := getSignedURL(blobServerURL, blobId, bundleDownloadConnTimeout)
uri, err := getSignedURL(client, blobServerURL, blobId)
if err != nil {
log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
return
Expand All @@ -268,7 +268,7 @@ func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTime
tempFileName = tempFile.Name()

var confReader io.ReadCloser
confReader, err = getURIReader(uri, bundleDownloadConnTimeout)
confReader, err = getUriReaderWithAuth(client, uri)
if err != nil {
log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
return
Expand All @@ -286,12 +286,14 @@ func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTime
}

// retrieveBundle retrieves bundle data from a URI
func getURIReader(uriString string, bundleDownloadConnTimeout time.Duration) (io.ReadCloser, error) {

client := http.Client{
Timeout: bundleDownloadConnTimeout,
func getUriReaderWithAuth(client *http.Client, uriString string) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", uriString, nil)
if err != nil {
return nil, err
}
res, err := client.Get(uriString)
// add Auth
req.Header.Add("Authorization", getBearerToken())
res, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apiGatewayConfDeploy
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"net/http"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -65,11 +66,16 @@ var _ = Describe("api", func() {
apiMan: dummyApiMan,
concurrentDownloads: concurrentDownloads,
markDeploymentFailedAfter: 5 * time.Second,
bundleDownloadConnTimeout: time.Second,
bundleRetryDelay: time.Second,
bundleCleanupDelay: 5 * time.Second,
downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
isClosed: new(int32),
client: &http.Client{
Timeout: time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
},
},
}
testBundleMan.initializeBundleDownloading()
time.Sleep(100 * time.Millisecond)
Expand All @@ -94,7 +100,7 @@ var _ = Describe("api", func() {
// setup timeout
atomic.StoreInt32(blobServer.signedTimeout, 1)
atomic.StoreInt32(blobServer.blobTimeout, 1)
testBundleMan.bundleDownloadConnTimeout = 500 * time.Millisecond
testBundleMan.client.Timeout = 500 * time.Millisecond
testBundleMan.bundleRetryDelay = 50 * time.Millisecond

// download blobs
Expand All @@ -109,7 +115,7 @@ var _ = Describe("api", func() {
// setup timeout
atomic.StoreInt32(blobServer.signedTimeout, 1)
atomic.StoreInt32(blobServer.blobTimeout, 1)
testBundleMan.bundleDownloadConnTimeout = 100 * time.Millisecond
testBundleMan.client.Timeout = 100 * time.Millisecond
testBundleMan.bundleRetryDelay = 100 * time.Millisecond
testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond

Expand Down
25 changes: 13 additions & 12 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
configDownloadQueueSize = "apigeesync_download_queue_size"
configBlobServerBaseURI = "apigeesync_blob_server_base"
configStoragePath = "local_storage_path"
maxIdleConnsPerHost = 10
maxIdleConnsPerHost = 50
httpTimeout = time.Minute
)

Expand Down Expand Up @@ -118,20 +118,21 @@ func initPlugin(s apid.Services) (apid.PluginData, error) {
apidClusterId = config.GetString(configApidClusterID)

// initialize tracker client
httpClient := &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: maxIdleConnsPerHost,
},
Timeout: httpTimeout,
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
req.Header.Set("Authorization", getBearerToken())
return nil
},
}

client := &trackerClient{
trackerBaseUrl: configApiServerBaseURI,
clusterId: apidClusterId,
httpclient: &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: maxIdleConnsPerHost,
},
Timeout: httpTimeout,
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
req.Header.Set("Authorization", getBearerToken())
return nil
},
},
httpclient: httpClient,
}

// initialize db manager
Expand Down Expand Up @@ -176,11 +177,11 @@ func initPlugin(s apid.Services) (apid.PluginData, error) {
apiMan: apiMan,
concurrentDownloads: concurrentDownloads,
markDeploymentFailedAfter: markDeploymentFailedAfter,
bundleDownloadConnTimeout: bundleDownloadConnTimeout,
bundleRetryDelay: time.Second,
bundleCleanupDelay: bundleCleanupDelay,
downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
isClosed: new(int32),
client: httpClient,
}

bundleMan.initializeBundleDownloading()
Expand Down

0 comments on commit 604b37b

Please sign in to comment.