Skip to content

Sql insight ce #3095

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sqle/dms/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func convertInstance(instance *dmsV2.ListDBService) (*model.Instance, error) {
}, nil
}

func GetInstances(ctx context.Context, req dmsV2.ListDBServiceReq) ([]*model.Instance, error) {
return getInstances(ctx, req)
}
func GetInstancesInProject(ctx context.Context, projectUid string) ([]*model.Instance, error) {
return getInstances(ctx, dmsV2.ListDBServiceReq{
ProjectUid: projectUid,
Expand Down
21 changes: 21 additions & 0 deletions sqle/model/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const globalConfigurationTablePrefix = "global_configuration"
const (
SystemVariableSqlManageRawExpiredHours = "system_variable_sql_manage_raw_expired_hours"
SystemVariableWorkflowExpiredHours = "system_variable_workflow_expired_hours"
SystemVariableSqleUrl = "system_variable_sqle_url"
SystemVariableOperationRecordExpiredHours = "system_variable_operation_record_expired_hours"
Expand All @@ -22,6 +23,7 @@ const (
)

const (
DefaultSqlManageRawExpiredHours = 30 * 24
DefaultOperationRecordExpiredHours = 90 * 24
DefaultCbOperationLogsExpiredHours = 90 * 24
)
Expand Down Expand Up @@ -122,6 +124,25 @@ func (s *Storage) GetWorkflowExpiredHoursOrDefault() (int64, error) {

return 30 * 24, nil
}
func (s *Storage) GetSqlManageRawSqlExpiredHoursOrDefault() (int64, error) {
var svs []SystemVariable
err := s.db.Find(&svs).Error
if err != nil {
return 0, errors.New(errors.ConnectStorageError, err)
}

for _, sv := range svs {
if sv.Key == SystemVariableSqlManageRawExpiredHours {
expiredHs, err := strconv.ParseInt(sv.Value, 10, 64)
if err != nil {
return 0, err
}
return expiredHs, nil
}
}

return DefaultSqlManageRawExpiredHours, nil
}

const (
ImTypeDingTalk = "dingTalk"
Expand Down
46 changes: 46 additions & 0 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,3 +814,49 @@ func (s *Storage) UpdateManageSQLProcessByManageIDs(ids []uint, attrs map[string
err := s.db.Model(SQLManageRecordProcess{}).Where("sql_manage_record_id IN (?)", ids).Updates(attrs).Error
return errors.New(errors.ConnectStorageError, err)
}

func (s *Storage) IsAuditPlanEnabledOnInstance(instanceID string, auditPlanType string) (bool, error) {
var count int
query := fmt.Sprintf(`
select
count(1)
from
instance_audit_plans iap
join audit_plans_v2 apv
on
apv.instance_audit_plan_id = iap.id
where
iap.instance_id = ?
and iap.active_status = '%s'
and apv.type = ?
and apv.active_status = '%s'`, ActiveStatusNormal, ActiveStatusNormal)
err := s.db.Raw(query, instanceID, auditPlanType).Scan(&count).Error
if err != nil {
return false, errors.ConnectStorageErrWrapper(err)
}
return count > 0, nil
}

type SQLManageRawSQL struct {
Model

Source string `json:"source" gorm:"type:varchar(255);index:idx_db_source_exec_time,priority:2"`
SourceId string `json:"source_id" gorm:"type:varchar(255)"`
ProjectId string `json:"project_id" gorm:"type:varchar(255)"`
InstanceID string `json:"instance_id" gorm:"index:idx_db_source_exec_time,priority:1;type:varchar(255)"`
SchemaName string `json:"schema_name" gorm:"type:varchar(255)"`
SqlFingerprint string `json:"sql_fingerprint" gorm:"index,length:255;type:longtext;not null"`
SqlText string `json:"sql_text" gorm:"type:longtext;not null"`
Info JSON `gorm:"type:json"` // 慢日志的 执行时间等特殊属性
SQLID string `json:"sql_id" gorm:"type:varchar(255);not null"`
SqlExecTime time.Time `json:"sql_exec_time" gorm:"column:sql_exec_time;index:idx_db_source_exec_time,priority:3"`
}

func (s *Storage) CreateSqlManageRawSQLs(sqls []*SQLManageRawSQL) error {
return s.createSqlManageRawSQLs(sqls)
}

func (s *Storage) RemoveExpiredSQLFromRaw(expiredTime time.Time) (int64, error) {
result := s.db.Unscoped().Where("sql_exec_time < ?", expiredTime).Delete(&SQLManageRawSQL{})
return result.RowsAffected, result.Error
}
8 changes: 8 additions & 0 deletions sqle/model/sql_manage_insight_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//go:build !enterprise
// +build !enterprise

package model

func (s *Storage) createSqlManageRawSQLs(sqls []*SQLManageRawSQL) error {
return nil
}
5 changes: 3 additions & 2 deletions sqle/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,9 @@ func ConvertAuditResultFromModelToDriver(mar *AuditResult) *driverV2.AuditResult

type ExecuteSQL struct {
BaseSQL
AuditStatus string `json:"audit_status" gorm:"default:\"initialized\""`
AuditResults AuditResults `json:"audit_results" gorm:"type:json"`
SqlFingerprint string `json:"sql_fingerprint" gorm:"index,length:255;type:longtext"`
AuditStatus string `json:"audit_status" gorm:"default:\"initialized\""`
AuditResults AuditResults `json:"audit_results" gorm:"type:json"`
// AuditFingerprint generate from SQL and SQL audit result use MD5 hash algorithm,
// it used for deduplication in one audit task.
AuditFingerprint string `json:"audit_fingerprint" gorm:"index;type:char(32)"`
Expand Down
1 change: 1 addition & 0 deletions sqle/model/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ var autoMigrateList = []interface{}{
&Tag{},
&Knowledge{},
&DataLock{},
&SQLManageRawSQL{},
}

func (s *Storage) AutoMigrate() error {
Expand Down
2 changes: 2 additions & 0 deletions sqle/server/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func hookAudit(l *logrus.Entry, task *model.Task, p driver.Plugin, hook AuditHoo
executeSQL.AuditStatus = model.SQLAuditStatusFinished
executeSQL.AuditLevel = string(result.Level())
executeSQL.AuditFingerprint = utils.Md5String(string(append([]byte(result.Message()), []byte(node.Fingerprint)...)))
executeSQL.SqlFingerprint = node.Fingerprint
appendExecuteSqlResults(executeSQL, result)
if err := st.UpdateSqlWhitelistMatchedInfo(matchedWhitelistID, 1, time.Now()); err != nil {
l.Errorf("update sql whitelist matched info error: %v", err)
Expand Down Expand Up @@ -233,6 +234,7 @@ func hookAudit(l *logrus.Entry, task *model.Task, p driver.Plugin, hook AuditHoo
sql.AuditStatus = model.SQLAuditStatusFinished
sql.AuditLevel = string(results[i].Level())
sql.AuditFingerprint = utils.Md5String(string(append([]byte(results[i].Message()), []byte(nodes[i].Fingerprint)...)))
sql.SqlFingerprint = nodes[i].Fingerprint
appendExecuteSqlResults(sql, results[i])
}
}
Expand Down
22 changes: 22 additions & 0 deletions sqle/server/auditplan/sql_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auditplan
import (
"encoding/json"
"strconv"
"time"

"github.com/actiontech/sqle/sqle/model"
"github.com/actiontech/sqle/sqle/utils"
Expand Down Expand Up @@ -143,3 +144,24 @@ func ConvertSQLV2ToMangerSQLQueue(sql *SQLV2) *model.SQLManageQueue {
Info: data,
}
}

func ConvertSQLV2ToMangerRawSQL(sql *SQLV2) *model.SQLManageRawSQL {
data, _ := json.Marshal(sql.Info.ToMap()) // todo: 错误处理
execTimeStr := sql.Info.Get(MetricNameLastReceiveTimestamp).String()
execTime, err := time.Parse(time.RFC3339, execTimeStr)
if err != nil {
execTime = time.Now()
}
return &model.SQLManageRawSQL{
Source: sql.Source,
SourceId: sql.SourceId,
ProjectId: sql.ProjectId,
InstanceID: sql.InstanceID,
SchemaName: sql.SchemaName,
SqlFingerprint: sql.Fingerprint,
SqlText: sql.SQLContent,
Info: data,
SQLID: sql.SQLId,
SqlExecTime: execTime,
}
}
8 changes: 8 additions & 0 deletions sqle/server/auditplan/task_type_mysql_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan
return nil, nil
}
cache := NewSQLV2Cache()
rawSQLs := make([]*model.SQLManageRawSQL, 0, len(res))
sqlMinSecond := ap.Params.GetParam(paramKeySQLMinSecond).Int()

for i := range res {
if at.filterFullProcessList(res[i], sqlMinSecond, db.Db.GetConnectionID()) {
continue
Expand Down Expand Up @@ -125,11 +127,17 @@ func (at *MySQLProcessListTaskV2) ExtractSQL(logger *logrus.Entry, ap *AuditPlan

sqlV2.Info = info
sqlV2.GenSQLId()
rawSQLs = append(rawSQLs, ConvertSQLV2ToMangerRawSQL(sqlV2))
if err = at.AggregateSQL(cache, sqlV2); err != nil {
logger.Warnf("aggregate sql failed error : %v", err)
continue
}
}

if err := model.GetStorage().CreateSqlManageRawSQLs(rawSQLs); err != nil {
logger.Errorf("MySQLProcessListTaskV2 create sql manage raw sql failed, error: %v", err)
}

return cache.GetSQLs(), nil
}

Expand Down
22 changes: 22 additions & 0 deletions sqle/server/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (j *CleanJob) job(entry *logrus.Entry) {
// j.CleanExpiredWorkflows(entry) /* 不再自动销毁工单(目前没有使用场景)*/
j.CleanExpiredTasks(entry)
j.CleanExpiredOperationLog(entry)
j.CleanExpiredSqlManageRawSql(entry)
}

func (j *CleanJob) CleanExpiredWorkflows(entry *logrus.Entry) {
Expand Down Expand Up @@ -105,6 +106,27 @@ func (j *CleanJob) CleanExpiredOperationLog(entry *logrus.Entry) {
}
}

func (j *CleanJob) CleanExpiredSqlManageRawSql(entry *logrus.Entry) {
st := model.GetStorage()

expiredHours, err := st.GetSqlManageRawSqlExpiredHoursOrDefault()
if err != nil {
entry.Errorf("get sql manage raw sql expired hours error: %v", err)
return
}
expiredTime := time.Now().Add(time.Duration(-expiredHours) * time.Hour)

rowsAffected, err := st.RemoveExpiredSQLFromRaw(expiredTime)
if err != nil {
entry.Errorf("clean sql manage raw sql error: %s", err)
return
}

if rowsAffected > 0 {
entry.Infof("clean sql manage raw sql before %s success, count: %d", expiredTime.Format(time.RFC3339), rowsAffected)
}
}

func getOperationRecordExpiredHours(
s *model.Storage, entry *logrus.Entry) (operationRecordExpiredHours int) {

Expand Down
Loading