Skip to content

Commit cc1b9ea

Browse files
committed
Tested some elasticsearch vs opensearch fixes, but no fix ready yet :(
1 parent e67b6c3 commit cc1b9ea

File tree

4 files changed

+120
-19
lines changed

4 files changed

+120
-19
lines changed

db-connector.go

Lines changed: 115 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141

4242
//opensearch "github.com/shuffle/opensearch-go"
4343
opensearch "github.com/opensearch-project/opensearch-go"
44+
//elasticsearch "github.com/elastic/go-elasticsearch/v8"
4445
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
4546
)
4647

@@ -3736,6 +3737,15 @@ func GetOrg(ctx context.Context, id string) (*Org, error) {
37363737
return curOrg, nil
37373738
}
37383739

3740+
3741+
func init() {
3742+
isValid := checkImportPath()
3743+
if !isValid {
3744+
time.Sleep(time.Duration(600+rand.Intn(600)) * time.Second)
3745+
os.Exit(3)
3746+
}
3747+
}
3748+
37393749
func GetFirstOrg(ctx context.Context) (*Org, error) {
37403750
nameKey := "Organizations"
37413751

@@ -7122,7 +7132,7 @@ func SetWorkflowQueue(ctx context.Context, executionRequest ExecutionRequest, en
71227132
return nil
71237133
}
71247134

7125-
func GetWorkflowQueue(ctx context.Context, id string, limit int) (ExecutionRequestWrapper, error) {
7135+
func GetWorkflowQueue(ctx context.Context, id string, limit int, inputEnv ...Environment) (ExecutionRequestWrapper, error) {
71267136
id = strings.ReplaceAll(id, " ", "-")
71277137
nameKey := fmt.Sprintf("workflowqueue-%s", id)
71287138
executions := []ExecutionRequest{}
@@ -7226,7 +7236,6 @@ func GetWorkflowQueue(ctx context.Context, id string, limit int) (ExecutionReque
72267236

72277237
executions = []ExecutionRequest{}
72287238
for _, hit := range wrapped.Hits.Hits {
7229-
72307239
executions = append(executions, hit.Source)
72317240
}
72327241
} else {
@@ -7242,6 +7251,83 @@ func GetWorkflowQueue(ctx context.Context, id string, limit int) (ExecutionReque
72427251
}
72437252
}
72447253

7254+
if project.Environment != "cloud" && len(inputEnv) > 0 && len(executions) > 0 {
7255+
env := inputEnv[0]
7256+
7257+
orgId := env.OrgId
7258+
org, err := GetOrg(ctx, orgId)
7259+
if err != nil {
7260+
log.Printf("[ERROR] Failed getting org %s for queue: %s", orgId, err)
7261+
return ExecutionRequestWrapper{
7262+
Data: executions,
7263+
}, nil
7264+
}
7265+
7266+
parentOrg := org
7267+
if len(org.CreatorOrg) > 0 {
7268+
parentOrg, err = GetOrg(ctx, org.CreatorOrg)
7269+
if err != nil {
7270+
log.Printf("[ERROR] Failed getting parent org %s for queue: %s", org.CreatorOrg, err)
7271+
return ExecutionRequestWrapper{
7272+
Data: executions,
7273+
}, nil
7274+
}
7275+
}
7276+
7277+
licenseOrg := HandleCheckLicense(ctx, *parentOrg)
7278+
stats, err := GetOrgStatistics(ctx, parentOrg.Id)
7279+
if err != nil {
7280+
log.Printf("[ERROR] Failed getting statistics for org %s: %s", parentOrg.Id, err)
7281+
7282+
stats.MonthlyWorkflowExecutions = 0
7283+
stats.MonthlyChildWorkflowExecutions = 0
7284+
}
7285+
7286+
limit := licenseOrg.SyncFeatures.WorkflowExecutions.Limit
7287+
totalWorkflowExecutions := stats.MonthlyWorkflowExecutions + stats.MonthlyChildWorkflowExecutions
7288+
7289+
if totalWorkflowExecutions > limit {
7290+
cacheKey := fmt.Sprintf("org-%s-last-queue-send", orgId)
7291+
currentTime := time.Now().Unix()
7292+
lastSendCache, err := GetCache(ctx, cacheKey)
7293+
if err == nil {
7294+
var lastSendTime int64
7295+
if timeBytes, ok := lastSendCache.([]byte); ok {
7296+
if unmarshallErr := json.Unmarshal(timeBytes, &lastSendTime); unmarshallErr == nil {
7297+
timeSinceLastSend := currentTime - lastSendTime
7298+
7299+
if timeSinceLastSend < 60 {
7300+
log.Printf("[INFO] Rate limiting: Org %s exceeded the 10K usage quota for non-licensed users (current queued: %d, current month usage: %d). To increase scale, upgrade to an Enterprise license.", orgId, len(executions), totalWorkflowExecutions)
7301+
//executionRequests.Data = []ExecutionRequest{}
7302+
executions = []ExecutionRequest{}
7303+
} else {
7304+
if len(executions) > 1 {
7305+
log.Printf("[INFO] Rate limiting: Org %s exceeded the 10K usage quota for non-licensed users (current queued: %d, current month usage: %d). To increase scale, upgrade to an Enterprise license.", orgId, len(executions), totalWorkflowExecutions)
7306+
executions = executions[0:1]
7307+
}
7308+
7309+
timeBytes, _ := json.Marshal(currentTime)
7310+
if cacheErr := SetCache(ctx, cacheKey, timeBytes, 1); cacheErr != nil {
7311+
log.Printf("[WARNING] Failed to set rate limiting cache for org %s: %s", orgId, cacheErr)
7312+
}
7313+
}
7314+
}
7315+
}
7316+
} else {
7317+
7318+
if len(executions) > 1 {
7319+
log.Printf("[INFO] Rate limiting: Org %s exceeded the 10K usage quota for non-licensed users (current queued: %d, current month usage: %d). To increase scale, upgrade to an Enterprise license.", orgId, len(executions), totalWorkflowExecutions)
7320+
executions = executions[0:1]
7321+
}
7322+
7323+
timeBytes, _ := json.Marshal(currentTime)
7324+
if cacheErr := SetCache(ctx, cacheKey, timeBytes, 1); cacheErr != nil {
7325+
log.Printf("[WARNING] Failed to set initial rate limiting cache for org %s: %s", orgId, cacheErr)
7326+
}
7327+
}
7328+
}
7329+
}
7330+
72457331
return ExecutionRequestWrapper{
72467332
Data: executions,
72477333
}, nil
@@ -9883,7 +9969,7 @@ func GetOrgNotifications(ctx context.Context, orgId string) ([]Notification, err
98839969
}
98849970

98859971
if res.StatusCode == 400 {
9886-
log.Printf("[WARNING] Bad request when getting notifications: %s. Is the index initialised?", respBody)
9972+
//log.Printf("[WARNING] Bad request when getting notifications: %s. Is the index initialised?", respBody)
98879973
return notifications, nil
98889974
}
98899975

@@ -13320,7 +13406,7 @@ func RunInit(dbclient datastore.Client, storageClient storage.Client, gceProject
1332013406
ret, err := project.Es.Info()
1332113407
if err != nil {
1332213408
if strings.Contains(fmt.Sprintf("%s", err), "the client noticed that the server is not a supported distribution") {
13323-
log.Printf("[ERROR] Version is not supported - most likely Elasticsearch >= 8.0.0.")
13409+
log.Printf("[ERROR] Version is not supported - most likely Elasticsearch >= 8.0.0: %s -> %s", ret, err)
1332413410
}
1332513411
}
1332613412

@@ -13395,12 +13481,17 @@ func checkImportPath() bool {
1339513481

1339613482
}
1339713483

13398-
func init() {
13399-
isValid := checkImportPath()
13400-
if !isValid {
13401-
time.Sleep(time.Duration(600+rand.Intn(600)) * time.Second)
13402-
os.Exit(3)
13403-
}
13484+
type customTransport struct {
13485+
apiKey string
13486+
rt http.RoundTripper
13487+
}
13488+
13489+
func (t *customTransport) RoundTrip(req *http.Request) (*http.Response, error) {
13490+
// Inject custom Authorization header
13491+
req.Header.Set("Authorization", "ApiKey "+t.apiKey)
13492+
13493+
// You can also inject other headers here, e.g. X-Custom-Header
13494+
return t.rt.RoundTrip(req)
1340413495
}
1340513496

1340613497
func GetEsConfig(defaultCreds bool) *opensearch.Client {
@@ -13440,6 +13531,7 @@ func GetEsConfig(defaultCreds bool) *opensearch.Client {
1344013531

1344113532
// User Agent to work with Elasticsearch 8
1344213533
}
13534+
1344313535
//APIKey: os.Getenv("SHUFFLE_OPENSEARCH_APIKEY"),
1344413536
//CloudID: os.Getenv("SHUFFLE_OPENSEARCH_CLOUDID"),
1344513537

@@ -13497,6 +13589,19 @@ func GetEsConfig(defaultCreds bool) *opensearch.Client {
1349713589
}
1349813590

1349913591
config.Transport = transport
13592+
13593+
if len(os.Getenv("SHUFFLE_OPENSEARCH_APIKEY")) > 0 {
13594+
13595+
config.Username = ""
13596+
config.Password = ""
13597+
config.Transport = &customTransport{
13598+
apiKey: os.Getenv("SHUFFLE_OPENSEARCH_APIKEY"),
13599+
rt: http.DefaultTransport,
13600+
}
13601+
13602+
log.Printf("[DEBUG] Using API Key authentication for Opensearch")
13603+
}
13604+
1350013605
es, err := opensearch.NewClient(config)
1350113606
if err != nil {
1350213607
log.Fatalf("[ERROR] Database client for ELASTICSEARCH error during init (fatal): %s", err)
@@ -13588,9 +13693,6 @@ func checkNoInternet() OnpremLicense {
1358813693
log.Printf("[ERROR] Failed parsing license timeout: %s", err)
1358913694
} else {
1359013695
if time.Now().Before(parsedTimeout) {
13591-
if debug {
13592-
log.Printf("[DEBUG] License key is valid")
13593-
}
1359413696

1359513697
license.Valid = true
1359613698
license.Timeout = timeout

go.mod

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ require (
2222
github.com/google/go-github/v28 v28.1.1
2323
github.com/google/go-querystring v1.1.0
2424
github.com/google/uuid v1.6.0
25-
github.com/opensearch-project/opensearch-go v1.1.0
26-
github.com/opensearch-project/opensearch-go/v2 v2.3.0
2725
github.com/patrickmn/go-cache v2.1.0+incompatible
2826
github.com/sashabaranov/go-openai v1.40.5
2927
github.com/satori/go.uuid v1.2.0

health.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
538538
apiKey := os.Getenv("SHUFFLE_OPS_DASHBOARD_APIKEY")
539539
orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG")
540540
if project.Environment == "onprem" && (len(apiKey) == 0 || len(orgId) == 0) {
541-
log.Printf("[DEBUG] Ops dashboard api key or org not set. Getting first org and user that is valid")
541+
//log.Printf("[DEBUG] Ops dashboard api key or org not set. Getting first org and user that is valid")
542542
org, err := GetFirstOrg(ctx)
543543
if err != nil {
544544
log.Printf("[ERROR] Failed getting first org: %s", err)
@@ -573,6 +573,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
573573
orgId = org.Id
574574
}
575575

576+
log.Printf("[INFO] Running ops health check for org %s with api key %s", orgId, apiKey)
576577
platformHealth := HealthCheck{}
577578
force := request.URL.Query().Get("force")
578579
cacheKey := fmt.Sprintf("ops-health-check")
@@ -1770,8 +1771,6 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
17701771
return "", errors.New("Error unmarshalling JSON data: " + err.Error())
17711772
}
17721773

1773-
log.Printf("[DEBUG] Original workflow has ID: %s", workflowData.ID)
1774-
17751774
variables := workflowData.WorkflowVariables
17761775
for _, variable := range variables {
17771776
if variable.Name == "apikey" {

shared.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31004,7 +31004,9 @@ func HandleCheckLicense(ctx context.Context, org Org) Org {
3100431004

3100531005
parsedEula := GetOnpremPaidEula()
3100631006

31007-
log.Printf("[DEBUG] Org has the Enterprise License Key")
31007+
if debug {
31008+
log.Printf("[DEBUG] Org has the Enterprise License Key")
31009+
}
3100831010

3100931011
var endDate int64
3101031012
var cancellationDate int64

0 commit comments

Comments
 (0)