diff --git a/sqle/dms/instance.go b/sqle/dms/instance.go index 69afd8cc8..471f80111 100644 --- a/sqle/dms/instance.go +++ b/sqle/dms/instance.go @@ -149,6 +149,9 @@ func convertInstance(instance *dmsV2.ListDBService) (*model.Instance, error) { }, nil } +func GetInstances(ctx context.Context, req dmsV2.ListDBServiceReq) ([]*model.Instance, error) { + return getInstances(ctx, req) +} func GetInstancesInProject(ctx context.Context, projectUid string) ([]*model.Instance, error) { return getInstances(ctx, dmsV2.ListDBServiceReq{ ProjectUid: projectUid, diff --git a/sqle/model/configuration.go b/sqle/model/configuration.go index 75570a6d0..8bf53a97e 100644 --- a/sqle/model/configuration.go +++ b/sqle/model/configuration.go @@ -14,6 +14,7 @@ import ( const globalConfigurationTablePrefix = "global_configuration" const ( + SystemVariableSqlManageRawExpiredHours = "system_variable_sql_manage_raw_expired_hours" SystemVariableWorkflowExpiredHours = "system_variable_workflow_expired_hours" SystemVariableSqleUrl = "system_variable_sqle_url" SystemVariableOperationRecordExpiredHours = "system_variable_operation_record_expired_hours" @@ -22,6 +23,7 @@ const ( ) const ( + DefaultSqlManageRawExpiredHours = 30 * 24 DefaultOperationRecordExpiredHours = 90 * 24 DefaultCbOperationLogsExpiredHours = 90 * 24 ) @@ -122,6 +124,25 @@ func (s *Storage) GetWorkflowExpiredHoursOrDefault() (int64, error) { return 30 * 24, nil } +func (s *Storage) GetSqlManageRawSqlExpiredHoursOrDefault() (int64, error) { + var svs []SystemVariable + err := s.db.Find(&svs).Error + if err != nil { + return 0, errors.New(errors.ConnectStorageError, err) + } + + for _, sv := range svs { + if sv.Key == SystemVariableSqlManageRawExpiredHours { + expiredHs, err := strconv.ParseInt(sv.Value, 10, 64) + if err != nil { + return 0, err + } + return expiredHs, nil + } + } + + return DefaultSqlManageRawExpiredHours, nil +} const ( ImTypeDingTalk = "dingTalk" diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 21bfa6768..c3521d0cd 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -814,3 +814,49 @@ func (s *Storage) UpdateManageSQLProcessByManageIDs(ids []uint, attrs map[string err := s.db.Model(SQLManageRecordProcess{}).Where("sql_manage_record_id IN (?)", ids).Updates(attrs).Error return errors.New(errors.ConnectStorageError, err) } + +func (s *Storage) IsAuditPlanEnabledOnInstance(instanceID string, auditPlanType string) (bool, error) { + var count int + query := fmt.Sprintf(` +select + count(1) +from + instance_audit_plans iap +join audit_plans_v2 apv +on + apv.instance_audit_plan_id = iap.id +where + iap.instance_id = ? + and iap.active_status = '%s' + and apv.type = ? + and apv.active_status = '%s'`, ActiveStatusNormal, ActiveStatusNormal) + err := s.db.Raw(query, instanceID, auditPlanType).Scan(&count).Error + if err != nil { + return false, errors.ConnectStorageErrWrapper(err) + } + return count > 0, nil +} + +type SQLManageRawSQL struct { + Model + + Source string `json:"source" gorm:"type:varchar(255);index:idx_db_source_exec_time,priority:2"` + SourceId string `json:"source_id" gorm:"type:varchar(255)"` + ProjectId string `json:"project_id" gorm:"type:varchar(255)"` + InstanceID string `json:"instance_id" gorm:"index:idx_db_source_exec_time,priority:1;type:varchar(255)"` + SchemaName string `json:"schema_name" gorm:"type:varchar(255)"` + SqlFingerprint string `json:"sql_fingerprint" gorm:"index,length:255;type:longtext;not null"` + SqlText string `json:"sql_text" gorm:"type:longtext;not null"` + Info JSON `gorm:"type:json"` // 慢日志的 执行时间等特殊属性 + SQLID string `json:"sql_id" gorm:"type:varchar(255);not null"` + SqlExecTime time.Time `json:"sql_exec_time" gorm:"column:sql_exec_time;index:idx_db_source_exec_time,priority:3"` +} + +func (s *Storage) CreateSqlManageRawSQLs(sqls []*SQLManageRawSQL) error { + return s.createSqlManageRawSQLs(sqls) +} + +func (s *Storage) RemoveExpiredSQLFromRaw(expiredTime time.Time) (int64, error) { + result := s.db.Unscoped().Where("sql_exec_time < ?", expiredTime).Delete(&SQLManageRawSQL{}) + return result.RowsAffected, result.Error +} diff --git a/sqle/model/sql_manage_insight_ce.go b/sqle/model/sql_manage_insight_ce.go new file mode 100644 index 000000000..9d5df0f1d --- /dev/null +++ b/sqle/model/sql_manage_insight_ce.go @@ -0,0 +1,8 @@ +//go:build !enterprise +// +build !enterprise + +package model + +func (s *Storage) createSqlManageRawSQLs(sqls []*SQLManageRawSQL) error { + return nil +} diff --git a/sqle/model/task.go b/sqle/model/task.go index 2d3f99689..e2abc47fa 100644 --- a/sqle/model/task.go +++ b/sqle/model/task.go @@ -331,8 +331,9 @@ func ConvertAuditResultFromModelToDriver(mar *AuditResult) *driverV2.AuditResult type ExecuteSQL struct { BaseSQL - AuditStatus string `json:"audit_status" gorm:"default:\"initialized\""` - AuditResults AuditResults `json:"audit_results" gorm:"type:json"` + SqlFingerprint string `json:"sql_fingerprint" gorm:"index,length:255;type:longtext"` + AuditStatus string `json:"audit_status" gorm:"default:\"initialized\""` + AuditResults AuditResults `json:"audit_results" gorm:"type:json"` // AuditFingerprint generate from SQL and SQL audit result use MD5 hash algorithm, // it used for deduplication in one audit task. AuditFingerprint string `json:"audit_fingerprint" gorm:"index;type:char(32)"` diff --git a/sqle/model/utils.go b/sqle/model/utils.go index 230b77211..c4d9a015c 100644 --- a/sqle/model/utils.go +++ b/sqle/model/utils.go @@ -183,6 +183,7 @@ var autoMigrateList = []interface{}{ &Tag{}, &Knowledge{}, &DataLock{}, + &SQLManageRawSQL{}, } func (s *Storage) AutoMigrate() error { diff --git a/sqle/server/audit.go b/sqle/server/audit.go index b0dce20af..0126c674a 100644 --- a/sqle/server/audit.go +++ b/sqle/server/audit.go @@ -205,6 +205,7 @@ func hookAudit(l *logrus.Entry, task *model.Task, p driver.Plugin, hook AuditHoo executeSQL.AuditStatus = model.SQLAuditStatusFinished executeSQL.AuditLevel = string(result.Level()) executeSQL.AuditFingerprint = utils.Md5String(string(append([]byte(result.Message()), []byte(node.Fingerprint)...))) + executeSQL.SqlFingerprint = node.Fingerprint appendExecuteSqlResults(executeSQL, result) if err := st.UpdateSqlWhitelistMatchedInfo(matchedWhitelistID, 1, time.Now()); err != nil { l.Errorf("update sql whitelist matched info error: %v", err) @@ -233,6 +234,7 @@ func hookAudit(l *logrus.Entry, task *model.Task, p driver.Plugin, hook AuditHoo sql.AuditStatus = model.SQLAuditStatusFinished sql.AuditLevel = string(results[i].Level()) sql.AuditFingerprint = utils.Md5String(string(append([]byte(results[i].Message()), []byte(nodes[i].Fingerprint)...))) + sql.SqlFingerprint = nodes[i].Fingerprint appendExecuteSqlResults(sql, results[i]) } } diff --git a/sqle/server/auditplan/sql_info.go b/sqle/server/auditplan/sql_info.go index b8a69bf1d..2656c1472 100644 --- a/sqle/server/auditplan/sql_info.go +++ b/sqle/server/auditplan/sql_info.go @@ -3,6 +3,7 @@ package auditplan import ( "encoding/json" "strconv" + "time" "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/utils" @@ -143,3 +144,24 @@ func ConvertSQLV2ToMangerSQLQueue(sql *SQLV2) *model.SQLManageQueue { Info: data, } } + +func ConvertSQLV2ToMangerRawSQL(sql *SQLV2) *model.SQLManageRawSQL { + data, _ := json.Marshal(sql.Info.ToMap()) // todo: 错误处理 + execTimeStr := sql.Info.Get(MetricNameLastReceiveTimestamp).String() + execTime, err := time.Parse(time.RFC3339, execTimeStr) + if err != nil { + execTime = time.Now() + } + return &model.SQLManageRawSQL{ + Source: sql.Source, + SourceId: sql.SourceId, + ProjectId: sql.ProjectId, + InstanceID: sql.InstanceID, + SchemaName: sql.SchemaName, + SqlFingerprint: sql.Fingerprint, + SqlText: sql.SQLContent, + Info: data, + SQLID: sql.SQLId, + SqlExecTime: execTime, + } +} diff --git a/sqle/server/auditplan/task_type_mysql_processlist.go b/sqle/server/auditplan/task_type_mysql_processlist.go index 5cb8f739f..4662ac480 100644 --- a/sqle/server/auditplan/task_type_mysql_processlist.go +++ b/sqle/server/auditplan/task_type_mysql_processlist.go @@ -91,7 +91,9 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan return nil, nil } cache := NewSQLV2Cache() + rawSQLs := make([]*model.SQLManageRawSQL, 0, len(res)) sqlMinSecond := ap.Params.GetParam(paramKeySQLMinSecond).Int() + for i := range res { if at.filterFullProcessList(res[i], sqlMinSecond, db.Db.GetConnectionID()) { continue @@ -125,11 +127,17 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan sqlV2.Info = info sqlV2.GenSQLId() + rawSQLs = append(rawSQLs, ConvertSQLV2ToMangerRawSQL(sqlV2)) if err = at.AggregateSQL(cache, sqlV2); err != nil { logger.Warnf("aggregate sql failed error : %v", err) continue } } + + if err := model.GetStorage().CreateSqlManageRawSQLs(rawSQLs); err != nil { + logger.Errorf("MySQLProcessListTaskV2 create sql manage raw sql failed, error: %v", err) + } + return cache.GetSQLs(), nil } diff --git a/sqle/server/clean.go b/sqle/server/clean.go index 7b8bd9a7e..f9cf364b6 100644 --- a/sqle/server/clean.go +++ b/sqle/server/clean.go @@ -30,6 +30,7 @@ func (j *CleanJob) job(entry *logrus.Entry) { // j.CleanExpiredWorkflows(entry) /* 不再自动销毁工单(目前没有使用场景)*/ j.CleanExpiredTasks(entry) j.CleanExpiredOperationLog(entry) + j.CleanExpiredSqlManageRawSql(entry) } func (j *CleanJob) CleanExpiredWorkflows(entry *logrus.Entry) { @@ -105,6 +106,27 @@ func (j *CleanJob) CleanExpiredOperationLog(entry *logrus.Entry) { } } +func (j *CleanJob) CleanExpiredSqlManageRawSql(entry *logrus.Entry) { + st := model.GetStorage() + + expiredHours, err := st.GetSqlManageRawSqlExpiredHoursOrDefault() + if err != nil { + entry.Errorf("get sql manage raw sql expired hours error: %v", err) + return + } + expiredTime := time.Now().Add(time.Duration(-expiredHours) * time.Hour) + + rowsAffected, err := st.RemoveExpiredSQLFromRaw(expiredTime) + if err != nil { + entry.Errorf("clean sql manage raw sql error: %s", err) + return + } + + if rowsAffected > 0 { + entry.Infof("clean sql manage raw sql before %s success, count: %d", expiredTime.Format(time.RFC3339), rowsAffected) + } +} + func getOperationRecordExpiredHours( s *model.Storage, entry *logrus.Entry) (operationRecordExpiredHours int) {