diff --git a/kernel/api/transaction.go b/kernel/api/transaction.go index ff4a61f8e..fac594d81 100644 --- a/kernel/api/transaction.go +++ b/kernel/api/transaction.go @@ -94,5 +94,8 @@ func pushTransactions(app, session string, transactions []*model.Transaction) { evt.AppId = app evt.SessionId = session evt.Data = transactions + for _, tx := range transactions { + tx.WaitForCommit() + } util.PushEvent(evt) } diff --git a/kernel/model/transaction.go b/kernel/model/transaction.go index a6efdf2a4..f53ec646e 100644 --- a/kernel/model/transaction.go +++ b/kernel/model/transaction.go @@ -23,6 +23,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/88250/gulu" @@ -128,6 +129,7 @@ func flushTx(tx *Transaction) { func PerformTransactions(transactions *[]*Transaction) { for _, tx := range *transactions { + tx.m = &sync.Mutex{} txQueue <- tx } return @@ -1168,8 +1170,6 @@ type Operation struct { Format string `json:"format"` // 属性视图列格式化 KeyID string `json:"keyID"` // 属性视列 ID RowID string `json:"rowID"` // 属性视图行 ID - - discard bool // 用于标识是否在事务合并中丢弃 } type Transaction struct { @@ -1181,6 +1181,18 @@ type Transaction struct { nodes map[string]*ast.Node luteEngine *lute.Lute + m *sync.Mutex + state atomic.Int32 // 0: 未提交,1: 已提交,2: 已回滚 +} + +func (tx *Transaction) WaitForCommit() { + for { + if 0 == tx.state.Load() { + time.Sleep(10 * time.Millisecond) + continue + } + return + } } func (tx *Transaction) begin() (err error) { @@ -1190,6 +1202,8 @@ func (tx *Transaction) begin() (err error) { tx.trees = map[string]*parse.Tree{} tx.nodes = map[string]*ast.Node{} tx.luteEngine = util.NewLute() + tx.m.Lock() + tx.state.Store(0) return } @@ -1202,11 +1216,15 @@ func (tx *Transaction) commit() (err error) { refreshDynamicRefTexts(tx.nodes, tx.trees) IncSync() tx.trees = nil + tx.state.Store(1) + tx.m.Unlock() return } func (tx *Transaction) rollback() { tx.trees, tx.nodes = nil, nil + tx.state.Store(2) + tx.m.Unlock() return }