Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqle/driver/mysql/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func runDefaultRulesInspectCase(t *testing.T, desc string, i *MysqlDriverImpl, s
rulepkg.DMLCheckAffectedRows: {},
rulepkg.DMLCheckSortColumnLength: {},
rulepkg.DDLCheckAllIndexNotNullConstraint: {},
rulepkg.DDLCheckTransactionNotCommitted: {},
rulepkg.DMLCheckAggregate: {},
rulepkg.DDLCheckColumnNotNULL: {},
rulepkg.DDLCheckTableRows: {},
Expand Down
124 changes: 3 additions & 121 deletions sqle/driver/mysql/plocale/active.en.toml

Large diffs are not rendered by default.

124 changes: 3 additions & 121 deletions sqle/driver/mysql/plocale/active.zh.toml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions sqle/driver/mysql/plocale/message_zh.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ var (
DDLCheckAllIndexNotNullConstraintDesc = &i18n.Message{ID: "DDLCheckAllIndexNotNullConstraintDesc", Other: "建议为至少一个索引添加非空约束"}
DDLCheckAllIndexNotNullConstraintAnnotation = &i18n.Message{ID: "DDLCheckAllIndexNotNullConstraintAnnotation", Other: "所有索引字段均未做非空约束,请确认下表索引规划的合理性。"}
DDLCheckAllIndexNotNullConstraintMessage = &i18n.Message{ID: "DDLCheckAllIndexNotNullConstraintMessage", Other: "建议为至少一个索引添加非空约束"}
DDLCheckTransactionNotCommittedDesc = &i18n.Message{ID: "DDLCheckTransactionNotCommittedDesc", Other: "DDL执行前存在事务未提交"}
DDLCheckTransactionNotCommittedAnnotation = &i18n.Message{ID: "DDLCheckTransactionNotCommittedAnnotation", Other: "检查可能产生锁冲突的DDL操作执行前是否存在事务未提交,避免DDL操作与未提交事务产生冲突,导致锁等待或死锁问题。MySQL8之前版本会查询information_schema.innodb_trx所有记录,MySQL8版本会查询performance_schema.data_locks相关记录。"}
DDLCheckTransactionNotCommittedMessage = &i18n.Message{ID: "DDLCheckTransactionNotCommittedMessage", Other: "DDL执行前存在事务未提交, %s"}
DMLCheckWithLimitDesc = &i18n.Message{ID: "DMLCheckWithLimitDesc", Other: "DELETE/UPDATE 语句不能有LIMIT条件"}
DMLCheckWithLimitAnnotation = &i18n.Message{ID: "DMLCheckWithLimitAnnotation", Other: "DELETE/UPDATE 语句使用LIMIT条件将随机选取数据进行删除或者更新,业务无法预期"}
DMLCheckWithLimitMessage = &i18n.Message{ID: "DMLCheckWithLimitMessage", Other: "DELETE/UPDATE 语句不能有LIMIT条件"}
Expand Down
128 changes: 128 additions & 0 deletions sqle/driver/mysql/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (

// inspector DDL rules
const (
DDLCheckTransactionNotCommitted = "ddl_check_transactions_not_committed"
DDLCheckPKWithoutIfNotExists = "ddl_check_table_without_if_not_exists"
DDLCheckObjectNameLength = "ddl_check_object_name_length"
DDLCheckObjectNameUsingKeyword = "ddl_check_object_name_using_keyword"
Expand Down Expand Up @@ -4896,6 +4897,133 @@ func checkAllIndexNotNullConstraint(input *RuleHandlerInput) error {
return nil
}

type checkTransactionNotCommittedBeforeDDLTableInfo struct {
Schema string
Table string
}

func (c checkTransactionNotCommittedBeforeDDLTableInfo) String() string {
if c.Table == "" {
return c.Schema
}
if c.Schema == "" {
return c.Table
}
return fmt.Sprintf("%s.%s", c.Schema, c.Table)
}

func checkTransactionNotCommittedBeforeDDL(input *RuleHandlerInput) error {
switch input.Node.(type) {
case ast.DDLNode:
default:
return nil
}

tables := extractDDLSchemaAndTable(input.Node)
if len(tables) == 0 {
return nil
}

const transactionExecSecs = 600

version, err := input.Ctx.GetMySQLMajorVersion()
if err != nil {
return err
}

for _, table := range tables {
if table.Schema == "" {
table.Schema = input.Ctx.CurrentSchema()
}
if table.Schema == "" {
continue
}

if input.Ctx.IsLowerCaseTableName() {
table.Schema, table.Table = strings.ToLower(table.Schema), strings.ToLower(table.Table)
}

if version >= 8 {
count, err := input.Ctx.CheckTableRelatedTransactionNotCommittedMySQL8(table.Schema, table.Table)
if err != nil {
return err
}
if count > 0 {
addResult(input.Res, input.Rule, input.Rule.Name,
fmt.Sprintf("performance_schema.data_locks存在%d条%s的相关记录", count, table))
return nil
}
} else {
count, ExecTimeoutCount, err := input.Ctx.CheckTransactionNotCommittedMySQL5(transactionExecSecs)
if err != nil {
return err
}
if count > 0 || ExecTimeoutCount > 0 {
addResult(input.Res, input.Rule, input.Rule.Name,
fmt.Sprintf("information_schema.innodb_trx存在%d条记录, %d条执行时长超过%d秒", count, ExecTimeoutCount, transactionExecSecs))
return nil
}
}
}

return nil
}

// extractDDLSchemaAndTable 从DDL语句中提取schema和table name
func extractDDLSchemaAndTable(node ast.Node) []checkTransactionNotCommittedBeforeDDLTableInfo {
var tables []checkTransactionNotCommittedBeforeDDLTableInfo

switch stmt := node.(type) {
case *ast.CreateTableStmt, *ast.CreateIndexStmt, *ast.CreateViewStmt, *ast.CreateSequenceStmt, *ast.CreateDatabaseStmt:
return nil
case *ast.AlterTableStmt:
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: stmt.Table.Schema.O,
Table: stmt.Table.Name.O,
})
case *ast.DropTableStmt:
for _, table := range stmt.Tables {
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: table.Schema.O,
Table: table.Name.O,
})
}
case *ast.DropIndexStmt:
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: stmt.Table.Schema.O,
Table: stmt.Table.Name.O,
})
case *ast.DropSequenceStmt:
for _, sequence := range stmt.Sequences {
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: sequence.Schema.O,
Table: sequence.Name.O,
})
}
case *ast.RenameTableStmt:
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: stmt.OldTable.Schema.O,
Table: stmt.OldTable.Name.O,
})
case *ast.TruncateTableStmt:
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: stmt.Table.Schema.O,
Table: stmt.Table.Name.O,
})
case *ast.RepairTableStmt:
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: stmt.Table.Schema.O,
Table: stmt.Table.Name.O,
})
case *ast.DropDatabaseStmt:
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
Schema: stmt.Name,
})
}

return tables
}

func checkInsertSelect(input *RuleHandlerInput) error {
if stmt, ok := input.Node.(*ast.InsertStmt); ok {
if stmt.Select != nil {
Expand Down
12 changes: 12 additions & 0 deletions sqle/driver/mysql/rule/rule_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,18 @@ var sourceRuleHandlers = []*SourceHandler{
Message: plocale.DDLCheckAllIndexNotNullConstraintMessage,
Func: checkAllIndexNotNullConstraint,
},
{
Rule: SourceRule{
Name: DDLCheckTransactionNotCommitted,
Desc: plocale.DDLCheckTransactionNotCommittedDesc,
Annotation: plocale.DDLCheckTransactionNotCommittedAnnotation,
Level: driverV2.RuleLevelWarn,
Category: plocale.RuleTypeUsageSuggestion,
AllowOffline: false,
},
Message: plocale.DDLCheckTransactionNotCommittedMessage,
Func: checkTransactionNotCommittedBeforeDDL,
},
{
Rule: SourceRule{
Name: DMLCheckWithLimit,
Expand Down
78 changes: 78 additions & 0 deletions sqle/driver/mysql/session/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,3 +1358,81 @@ func (c *Context) fetchExecutionPlanWithWarnings(sql string) (*executor.ExplainW
Warnings: WarningsRecords,
}, nil
}

// GetMySQLMajorVersion 返回 MySQL 主版本号(如 5、8),离线或无 executor 时返回 0。
func (c *Context) GetMySQLMajorVersion() (int, error) {
if c.e == nil {
return 0, nil
}
results, err := c.e.Db.Query("SELECT @@version AS version")
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
v, ok := results[0]["version"]
if !ok || !v.Valid {
return 0, nil
}
parts := strings.SplitN(v.String, ".", 2)
if len(parts) == 0 {
return 0, nil
}
major, err := strconv.Atoi(parts[0])
if err != nil {
return 0, err
}
return major, nil
}

// CheckTableRelatedTransactionNotCommittedMySQL8 查询 performance_schema.data_locks 中与指定表相关的未提交事务/锁记录数(MySQL 8+)。
func (c *Context) CheckTableRelatedTransactionNotCommittedMySQL8(schema, table string) (int, error) {
if c.e == nil {
return 0, nil
}
query := "SELECT COUNT(*) AS cnt FROM performance_schema.data_locks WHERE OBJECT_SCHEMA = ? AND OBJECT_NAME = ?"
results, err := c.e.Db.Query(query, schema, table)
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
cntStr, ok := results[0]["cnt"]
if !ok || !cntStr.Valid {
return 0, nil
}
cnt, err := strconv.Atoi(cntStr.String)
if err != nil {
return 0, err
}
return cnt, nil
}

// CheckTransactionNotCommittedMySQL5 查询 information_schema.innodb_trx 中未提交事务数,以及执行超过 execTimeoutSecs 秒的事务数(MySQL 5)。
func (c *Context) CheckTransactionNotCommittedMySQL5(execTimeoutSecs int) (count int, execTimeoutCount int, err error) {
if c.e == nil {
return 0, 0, nil
}
results, err := c.e.Db.Query("SELECT trx_id, trx_started FROM information_schema.innodb_trx")
if err != nil {
return 0, 0, err
}
count = len(results)
if execTimeoutSecs <= 0 {
return count, 0, nil
}
// 统计执行超过 execTimeoutSecs 秒的事务需要按 trx_started 与当前时间比较,这里简化:仅返回总数,execTimeoutCount 由调用方用额外查询或在此用原生 SQL 计算
timeoutQuery := fmt.Sprintf("SELECT COUNT(*) AS cnt FROM information_schema.innodb_trx WHERE TIMESTAMPDIFF(SECOND, trx_started, NOW()) > %d", execTimeoutSecs)
timeoutResults, err := c.e.Db.Query(timeoutQuery)
if err != nil {
return count, 0, nil
}
if len(timeoutResults) > 0 {
if n, ok := timeoutResults[0]["cnt"]; ok && n.Valid {
execTimeoutCount, _ = strconv.Atoi(n.String)
}
}
return count, execTimeoutCount, nil
}
Loading