Skip to content

Commit 25268b0

Browse files
committed
Made docker image download controllable in shuffle-shared vs just worker
1 parent 277a4d8 commit 25268b0

File tree

4 files changed

+139
-19
lines changed

4 files changed

+139
-19
lines changed

codegen.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ import (
1616
"sort"
1717
"strconv"
1818
"strings"
19+
"net/http"
1920

2021
"cloud.google.com/go/storage"
2122
"github.com/frikky/kin-openapi/openapi3"
23+
docker "github.com/docker/docker/client"
2224

2325
//"github.com/satori/go.uuid"
2426
"gopkg.in/yaml.v2"
2527
)
2628

29+
var downloadedImages = []string{}
2730
var pythonAllowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"
2831
var pythonReplacements = map[string]string{
2932
"[": "",
@@ -3771,3 +3774,125 @@ func RemoveJsonValues(input []byte, depth int64) ([]byte, string, error) {
37713774

37723775
return input, keyToken, nil
37733776
}
3777+
3778+
func DownloadDockerImageBackend(topClient *http.Client, imageName string) error {
3779+
// Check environment SHUFFLE_AUTO_IMAGE_DOWNLOAD
3780+
if os.Getenv("SHUFFLE_AUTO_IMAGE_DOWNLOAD") == "false" {
3781+
log.Printf("[DEBUG] SHUFFLE_AUTO_IMAGE_DOWNLOAD is false. Not downloading image %s", imageName)
3782+
return nil
3783+
}
3784+
3785+
if ArrayContains(downloadedImages, imageName) && project.Environment == "worker" {
3786+
log.Printf("[DEBUG] Image %s already downloaded - not re-downloading. This only applies to workers.", imageName)
3787+
return nil
3788+
}
3789+
3790+
baseUrl := os.Getenv("BASE_URL")
3791+
log.Printf("[DEBUG] Trying to download image %s from backend %s as it doesn't exist. All images: %#v", imageName, baseUrl, downloadedImages)
3792+
3793+
downloadedImages = append(downloadedImages, imageName)
3794+
3795+
data := fmt.Sprintf(`{"name": "%s"}`, imageName)
3796+
dockerImgUrl := fmt.Sprintf("%s/api/v1/get_docker_image", baseUrl)
3797+
3798+
req, err := http.NewRequest(
3799+
"POST",
3800+
dockerImgUrl,
3801+
bytes.NewBuffer([]byte(data)),
3802+
)
3803+
3804+
// Specific to the worker
3805+
authorization := os.Getenv("AUTHORIZATION")
3806+
if len(authorization) > 0 {
3807+
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization))
3808+
} else {
3809+
// Specific to Orborus auth (org + auth) -> environment auth
3810+
authorization = os.Getenv("AUTH")
3811+
if len(authorization) > 0 {
3812+
log.Printf("[DEBUG] Found Orborus environment auth - adding to header.")
3813+
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization))
3814+
3815+
org := os.Getenv("ORG")
3816+
if len(org) > 0 {
3817+
req.Header.Add("Org-Id", org)
3818+
}
3819+
3820+
} else {
3821+
log.Printf("[WARNING] No auth found - running backend download without it.")
3822+
}
3823+
}
3824+
3825+
newresp, err := topClient.Do(req)
3826+
if err != nil {
3827+
log.Printf("[ERROR] Failed download request for %s: %s", imageName, err)
3828+
return err
3829+
}
3830+
3831+
defer newresp.Body.Close()
3832+
if newresp.StatusCode != 200 {
3833+
log.Printf("[ERROR] Docker download for image %s (backend) StatusCode (1): %d", imageName, newresp.StatusCode)
3834+
return errors.New(fmt.Sprintf("Failed to get image - status code %d", newresp.StatusCode))
3835+
}
3836+
3837+
newImageName := strings.Replace(imageName, "/", "_", -1)
3838+
newFileName := newImageName + ".tar"
3839+
3840+
tar, err := os.Create(newFileName)
3841+
if err != nil {
3842+
log.Printf("[WARNING] Failed creating file: %s", err)
3843+
return err
3844+
}
3845+
3846+
defer tar.Close()
3847+
_, err = io.Copy(tar, newresp.Body)
3848+
if err != nil {
3849+
log.Printf("[WARNING] Failed response body copying: %s", err)
3850+
return err
3851+
}
3852+
tar.Seek(0, 0)
3853+
3854+
dockercli, err := docker.NewEnvClient()
3855+
if err != nil {
3856+
log.Printf("[ERROR] Unable to create docker client (3): %s", err)
3857+
return err
3858+
}
3859+
3860+
defer dockercli.Close()
3861+
3862+
imageLoadResponse, err := dockercli.ImageLoad(context.Background(), tar, true)
3863+
if err != nil {
3864+
log.Printf("[ERROR] Error loading images: %s", err)
3865+
return err
3866+
}
3867+
3868+
defer imageLoadResponse.Body.Close()
3869+
body, err := ioutil.ReadAll(imageLoadResponse.Body)
3870+
if err != nil {
3871+
log.Printf("[ERROR] Error reading: %s", err)
3872+
return err
3873+
}
3874+
3875+
if strings.Contains(string(body), "no such file") {
3876+
return errors.New(string(body))
3877+
}
3878+
3879+
baseTag := strings.Split(imageName, ":")
3880+
if len(baseTag) > 1 {
3881+
tag := baseTag[1]
3882+
log.Printf("[DEBUG] Creating tag copies of downloaded containers from tag %s", tag)
3883+
3884+
// Remapping
3885+
ctx := context.Background()
3886+
dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag))
3887+
dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
3888+
3889+
downloadedImages = append(downloadedImages, fmt.Sprintf("frikky/shuffle:%s", tag))
3890+
downloadedImages = append(downloadedImages, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
3891+
3892+
}
3893+
3894+
os.Remove(newFileName)
3895+
3896+
log.Printf("[INFO] Successfully loaded image %s: %s", imageName, string(body))
3897+
return nil
3898+
}

db-connector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3724,7 +3724,6 @@ func GetOrg(ctx context.Context, id string) (*Org, error) {
37243724
} else {
37253725
key := datastore.NameKey(nameKey, id, nil)
37263726
if err := project.Dbclient.Get(ctx, key, curOrg); err != nil {
3727-
log.Printf("[ERROR] Error in org loading (2) for %s: %s", key, err)
37283727
//log.Printf("Users: %s", curOrg.Users)
37293728
if strings.Contains(err.Error(), `cannot load field`) && strings.Contains(err.Error(), `users`) && !strings.Contains(err.Error(), `users_last_session`) {
37303729
//Self correcting Org handler for user migration. This may come in handy if we change the structure of private apps later too.
@@ -3756,9 +3755,10 @@ func GetOrg(ctx context.Context, id string) (*Org, error) {
37563755
setOrg = true
37573756
}
37583757
} else if strings.Contains(err.Error(), `cannot load field`) {
3759-
log.Printf("[WARNING] Error in org loading (4), but returning without warning: %s", err)
3758+
//log.Printf("[WARNING] Error in org loading (4), but returning without warning: %s", err)
37603759
err = nil
37613760
} else {
3761+
log.Printf("[ERROR] Error in org loading (2) for %s: %s", key, err)
37623762
return &Org{}, err
37633763
}
37643764
}

oauth2.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3534,7 +3534,7 @@ func GetOauth2ApplicationPermissionToken(ctx context.Context, user User, appAuth
35343534
username := ""
35353535
password := ""
35363536

3537-
log.Printf("[DEBUG] Got %d auth fields (%s)", len(appAuth.Fields), appAuth.Id)
3537+
//log.Printf("[DEBUG] Got %d auth fields (%s)", len(appAuth.Fields), appAuth.Id)
35383538
for _, field := range appAuth.Fields {
35393539
if field.Key == "client_secret" {
35403540
clientSecret = field.Value
@@ -3580,22 +3580,15 @@ func GetOauth2ApplicationPermissionToken(ctx context.Context, user User, appAuth
35803580
refreshData += fmt.Sprintf("&scope=%s", strings.Replace(scope, ",", " ", -1))
35813581
}
35823582

3583-
35843583
if strings.Contains(refreshData, "user_impersonation") && strings.Contains (refreshData, "azure") && !strings.Contains(refreshData, "resource="){
35853584
// Add "resource" for microsoft hings
35863585
refreshData += "&resource=https://management.azure.com"
35873586
}
35883587

3589-
/*
3590-
if strings.Contains(refreshData, "client_credentials") && strings.Contains (refreshData, "azure") && !strings.Contains(refreshData, "resource="){
3591-
refreshData += "&resource=https://management.azure.com"
3592-
}
3593-
*/
3594-
3588+
// Not necessary for refresh
35953589
log.Printf("[DEBUG] Oauth2 REFRESH DATA: %#v. URL: %#v", refreshData, tokenUrl)
35963590

35973591
client := GetExternalClient(tokenUrl)
3598-
35993592
req, err := http.NewRequest(
36003593
"POST",
36013594
tokenUrl,
@@ -3619,14 +3612,15 @@ func GetOauth2ApplicationPermissionToken(ctx context.Context, user User, appAuth
36193612
return appAuth, err
36203613
}
36213614

3622-
log.Printf("[DEBUG] Oauth2 application auth Response for %s: %d", tokenUrl, newresp.StatusCode)
3623-
36243615
defer newresp.Body.Close()
36253616
body, err := ioutil.ReadAll(newresp.Body)
36263617
if err != nil {
3618+
log.Printf("[ERROR] Oauth2 application auth: Failed to read response body: %s", err)
36273619
return appAuth, err
36283620
}
36293621

3622+
log.Printf("[DEBUG] Oauth2 application auth Response for %s: %d", tokenUrl, newresp.StatusCode)
3623+
36303624
if newresp.StatusCode >= 300 {
36313625
// Printing on error to handle in future instances
36323626
log.Printf("[ERROR] Oauth2 application data for %s: %#v", tokenUrl, string(body))

shared.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6692,7 +6692,7 @@ func diffWorkflows(oldWorkflow Workflow, newWorkflow Workflow, update bool) {
66926692
if err != nil {
66936693
log.Printf("[WARNING] Failed updating child workflow %s: %s", childWorkflow.ID, err)
66946694
} else {
6695-
log.Printf("\n\n[INFO] Updated child workflow '%s' based on parent %s\n\n", childWorkflow.ID, oldWorkflow.ID)
6695+
log.Printf("[INFO] Updated child workflow '%s' based on parent %s", childWorkflow.ID, oldWorkflow.ID)
66966696

66976697
SetWorkflowRevision(ctx, childWorkflow)
66986698
passedOrg := Org{
@@ -7566,7 +7566,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
75667566

75677567
if added {
75687568
orgUpdated = true
7569-
log.Printf("[DEBUG] Org updated with new apps: %s", org.ActiveApps)
7569+
//log.Printf("[DEBUG] Org updated with new apps: %s", org.ActiveApps)
75707570

75717571
//DeleteCache(ctx, fmt.Sprintf("apps_%s", user.Id))
75727572
DeleteCache(ctx, fmt.Sprintf("workflowapps-sorted-100"))
@@ -15137,7 +15137,8 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
1513715137
}
1513815138
}
1513915139

15140-
log.Printf("\n\n[DEBUG][%s] Found that %s (%s) should be skipped? Should check if it has more parents. If not, send in a skip\n\n", workflowExecution.ExecutionId, foundAction.Label, foundAction.AppName)
15140+
// FIXME: Debug logs necessary to understand how workflows finish?
15141+
log.Printf("[DEBUG][%s] Found that %s (%s) should be skipped? Should check if it has more parents. If not, send in a skip", workflowExecution.ExecutionId, foundAction.Label, foundAction.AppName)
1514115142

1514215143
foundCount := 0
1514315144
skippedBranches := []string{}
@@ -20885,16 +20886,16 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
2088520886
setAuth := false
2088620887
executionAuthKey := fmt.Sprintf("oauth2_%s", curAuth.Id)
2088720888

20888-
log.Printf("[DEBUG] Looking for cached authkey '%s'", executionAuthKey)
20889+
//log.Printf("[DEBUG] Looking for cached authkey '%s'", executionAuthKey)
2088920890
execAuthData, err := GetCache(ctx, executionAuthKey)
2089020891
if err == nil {
20891-
log.Printf("[DEBUG] Successfully retrieved auth wrapper from cache for %s", executionAuthKey)
20892+
//log.Printf("[DEBUG] Successfully retrieved auth wrapper from cache for %s", executionAuthKey)
2089220893
cacheData := []byte(execAuthData.([]uint8))
2089320894

2089420895
appAuthWrapper := AppAuthenticationStorage{}
2089520896
err = json.Unmarshal(cacheData, &appAuthWrapper)
2089620897
if err == nil {
20897-
log.Printf("[DEBUG] Successfully unmarshalled auth wrapper from cache for %s", executionAuthKey)
20898+
//log.Printf("[DEBUG] Successfully unmarshalled auth wrapper from cache for %s", executionAuthKey)
2089820899

2089920900
newParams = action.Parameters
2090020901
for _, param := range appAuthWrapper.Fields {

0 commit comments

Comments
 (0)