🎨 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113

This commit is contained in:
Liang Ding 2023-01-26 00:11:06 +08:00
parent 7492913ddd
commit c1ff45f4a4
No known key found for this signature in database
GPG key ID: 136F30F901A2231D
9 changed files with 54 additions and 63 deletions

View file

@ -975,8 +975,8 @@
"86": "Please configure [Settings - About - Access authorization code]", "86": "Please configure [Settings - About - Access authorization code]",
"87": "Cannot move to this location", "87": "Cannot move to this location",
"88": "Finished parsing [%d] data files, remaining to be processed [%d]", "88": "Finished parsing [%d] data files, remaining to be processed [%d]",
"89": "Created [%d] of data indexes of block-level elements [%s]", "89": "[%d/%d] Created [%d] of data indexes of block-level elements [%s]",
"90": "Created [%d] of search indexes of block-level elements [%s]", "90": "[%d/%d] Created [%d] of search indexes of block-level elements [%s]",
"91": "Reading block tree data...", "91": "Reading block tree data...",
"92": "Parsing document tree [%s]", "92": "Parsing document tree [%s]",
"93": "TODO", "93": "TODO",

View file

@ -975,8 +975,8 @@
"86": "Por favor, configure [Configuración - Acerca de - Código de autorización de acceso]", "86": "Por favor, configure [Configuración - Acerca de - Código de autorización de acceso]",
"87": "No se puede mover a esta ubicación", "87": "No se puede mover a esta ubicación",
"88": "Se ha terminado de analizar [%d] archivos de datos, quedan por procesar [%d]", "88": "Se ha terminado de analizar [%d] archivos de datos, quedan por procesar [%d]",
"89": "Creado [%d] de índices de datos de elementos a nivel de bloque [%s]", "89": "[%d/%d] Creado [%d] de índices de datos de elementos a nivel de bloque [%s]",
"90": "Creado [%d] de índices de búsqueda de elementos a nivel de bloque [%s]", "90": "[%d/%d] Creado [%d] de índices de búsqueda de elementos a nivel de bloque [%s]",
"91": "Leyendo datos del árbol de bloques...", "91": "Leyendo datos del árbol de bloques...",
"92": "Analizando el árbol del documento [%s]", "92": "Analizando el árbol del documento [%s]",
"93": "TODO", "93": "TODO",

View file

@ -975,8 +975,8 @@
"86": "Veuillez configurer [Paramètres - A propos de - Code d'autorisation d'accès]", "86": "Veuillez configurer [Paramètres - A propos de - Code d'autorisation d'accès]",
"87": "Impossible de se déplacer vers cet endroit", "87": "Impossible de se déplacer vers cet endroit",
"88": "Fin de l'analyse des fichiers de données [%d], restant à traiter [%d]", "88": "Fin de l'analyse des fichiers de données [%d], restant à traiter [%d]",
"89": "Créé [%d] d'index de données d'éléments de niveau bloc [%s]", "89": "[%d/%d] Créé [%d] d'index de données d'éléments de niveau bloc [%s]",
"90": "Création de [%d] index de recherche d'éléments de niveau bloc [%s]", "90": "[%d/%d] Création de [%d] index de recherche d'éléments de niveau bloc [%s]",
"91": "Lecture des données de l'arborescence des blocs...", "91": "Lecture des données de l'arborescence des blocs...",
"92": "Analyse de l'arborescence du document [%s]", "92": "Analyse de l'arborescence du document [%s]",
"93": "TODO", "93": "TODO",

View file

@ -975,8 +975,8 @@
"86": "請先配置 [設置 - 關於 - 存取授權碼]", "86": "請先配置 [設置 - 關於 - 存取授權碼]",
"87": "無法移動到該位置", "87": "無法移動到該位置",
"88": "已完成解析 [%d] 個數據文件,剩餘待處理 [%d]", "88": "已完成解析 [%d] 個數據文件,剩餘待處理 [%d]",
"89": "已經建立 [%d] 個塊級元素的數據索引 [%s]", "89": "[%d/%d] 已經建立 [%d] 個塊級元素的數據索引 [%s]",
"90": "已經建立 [%d] 個塊級元素的搜索索引 [%s]", "90": "[%d/%d] 已經建立 [%d] 個塊級元素的搜索索引 [%s]",
"91": "正在讀取塊樹數據...", "91": "正在讀取塊樹數據...",
"92": "正在解析文檔樹 [%s]", "92": "正在解析文檔樹 [%s]",
"93": "TODO", "93": "TODO",

View file

@ -975,8 +975,8 @@
"86": "请先配置 [设置 - 关于 - 访问授权码]", "86": "请先配置 [设置 - 关于 - 访问授权码]",
"87": "无法移动到该位置", "87": "无法移动到该位置",
"88": "已完成解析 [%d] 个数据文件,剩余待处理 [%d]", "88": "已完成解析 [%d] 个数据文件,剩余待处理 [%d]",
"89": "已经建立 [%d] 个块级元素的数据索引 [%s]", "89": "[%d/%d] 已经建立 [%d] 个块级元素的数据索引 [%s]",
"90": "已经建立 [%d] 个块级元素的搜索索引 [%s]", "90": "[%d/%d] 已经建立 [%d] 个块级元素的搜索索引 [%s]",
"91": "正在读取块树数据...", "91": "正在读取块树数据...",
"92": "正在解析文档树 [%s]", "92": "正在解析文档树 [%s]",
"93": "TODO", "93": "TODO",

View file

@ -187,23 +187,27 @@ func IndexRefs() {
} }
func init() { func init() {
eventbus.Subscribe(eventbus.EvtSQLInsertBlocks, func(context map[string]interface{}, blockCount int, hash string) { //eventbus.Subscribe(eventbus.EvtSQLInsertBlocks, func(context map[string]interface{}, current, total, blockCount int, hash string) {
// if util.ContainerAndroid == util.Container || util.ContainerIOS == util.Container {
// // Android/iOS 端不显示数据索引和搜索索引状态提示 https://github.com/siyuan-note/siyuan/issues/6392
// return
// }
//
// msg := fmt.Sprintf(Conf.Language(89), current, total, blockCount, hash)
// util.SetBootDetails(msg)
// util.ContextPushMsg(context, msg)
//})
eventbus.Subscribe(eventbus.EvtSQLInsertBlocksFTS, func(context map[string]interface{}, current, total, blockCount int, hash string) {
if util.ContainerAndroid == util.Container || util.ContainerIOS == util.Container { if util.ContainerAndroid == util.Container || util.ContainerIOS == util.Container {
// Android/iOS 端不显示数据索引和搜索索引状态提示 https://github.com/siyuan-note/siyuan/issues/6392 // Android/iOS 端不显示数据索引和搜索索引状态提示 https://github.com/siyuan-note/siyuan/issues/6392
return return
} }
msg := fmt.Sprintf(Conf.Language(89), blockCount, hash) if (1 > current && 1 == total) || current == total-1 {
util.SetBootDetails(msg) current = total
util.ContextPushMsg(context, msg)
})
eventbus.Subscribe(eventbus.EvtSQLInsertBlocksFTS, func(context map[string]interface{}, blockCount int, hash string) {
if util.ContainerAndroid == util.Container || util.ContainerIOS == util.Container {
// Android/iOS 端不显示数据索引和搜索索引状态提示 https://github.com/siyuan-note/siyuan/issues/6392
return
} }
msg := fmt.Sprintf(Conf.Language(90), blockCount, hash) msg := fmt.Sprintf(Conf.Language(90), current, total, blockCount, hash)
util.SetBootDetails(msg) util.SetBootDetails(msg)
util.ContextPushMsg(context, msg) util.ContextPushMsg(context, msg)
}) })

View file

@ -180,9 +180,10 @@ func initHistoryDBConnection() {
dsn := util.HistoryDBPath + "?_journal_mode=OFF" + dsn := util.HistoryDBPath + "?_journal_mode=OFF" +
"&_synchronous=OFF" + "&_synchronous=OFF" +
"&_mmap_size=2684354560" +
"&_secure_delete=OFF" + "&_secure_delete=OFF" +
"&_cache_size=-20480" + "&_cache_size=-20480" +
"&_page_size=8192" + "&_page_size=32768" +
"&_busy_timeout=7000" + "&_busy_timeout=7000" +
"&_ignore_check_constraints=ON" + "&_ignore_check_constraints=ON" +
"&_temp_store=MEMORY" + "&_temp_store=MEMORY" +
@ -212,9 +213,10 @@ func initDBConnection() {
} }
dsn := util.DBPath + "?_journal_mode=WAL" + dsn := util.DBPath + "?_journal_mode=WAL" +
"&_synchronous=OFF" + "&_synchronous=OFF" +
"&_mmap_size=2684354560" +
"&_secure_delete=OFF" + "&_secure_delete=OFF" +
"&_cache_size=-20480" + "&_cache_size=-20480" +
"&_page_size=8192" + "&_page_size=32768" +
"&_busy_timeout=7000" + "&_busy_timeout=7000" +
"&_ignore_check_constraints=ON" + "&_ignore_check_constraints=ON" +
"&_temp_store=MEMORY" + "&_temp_store=MEMORY" +

View file

@ -101,57 +101,42 @@ func FlushQueue() {
txLock.Lock() txLock.Lock()
defer txLock.Unlock() defer txLock.Unlock()
start := time.Now() start := time.Now()
tx, err := beginTx()
if nil != err {
return
}
var execOps int
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
total := len(ops)
for i, op := range ops { for i, op := range ops {
if util.IsExiting { if util.IsExiting {
return return
} }
err = execOp(op, tx, context) tx, err := beginTx()
execOps++
if nil != err { if nil != err {
return
}
if err = execOp(op, tx, i, total, context); nil != err {
logging.LogErrorf("queue operation failed: %s", err) logging.LogErrorf("queue operation failed: %s", err)
return return
} }
if 0 < i && 0 == execOps%128 {
if err = commitTx(tx); nil != err { if err = commitTx(tx); nil != err {
logging.LogErrorf("commit tx failed: %s", err) logging.LogErrorf("commit tx failed: %s", err)
return return
} }
execOps = 0
tx, err = beginTx()
if nil != err {
return
}
}
} }
if 0 < execOps {
if err = commitTx(tx); nil != err {
logging.LogErrorf("commit tx failed: %s", err)
}
}
elapsed := time.Now().Sub(start).Milliseconds() elapsed := time.Now().Sub(start).Milliseconds()
if 5000 < elapsed { if 5000 < elapsed {
logging.LogInfof("op tx [%dms]", elapsed) logging.LogInfof("op tx [%dms]", elapsed)
} }
} }
func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) { func execOp(op *dbQueueOperation, tx *sql.Tx, current, total int, context map[string]interface{}) (err error) {
switch op.action { switch op.action {
case "index": case "index":
err = indexTree(tx, op.box, op.indexPath, context) err = indexTree(tx, op.box, op.indexPath, current, total, context)
case "upsert": case "upsert":
err = upsertTree(tx, op.upsertTree, context) err = upsertTree(tx, op.upsertTree, current, total, context)
case "delete": case "delete":
err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath) err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
case "delete_id": case "delete_id":
@ -171,7 +156,7 @@ func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (e
case "update_refs": case "update_refs":
err = upsertRefs(tx, op.upsertTree) err = upsertRefs(tx, op.upsertTree)
default: default:
msg := fmt.Sprint("unknown operation [%s]", op.action) msg := fmt.Sprintf("unknown operation [%s]", op.action)
logging.LogErrorf(msg) logging.LogErrorf(msg)
err = errors.New(msg) err = errors.New(msg)
} }

View file

@ -51,7 +51,7 @@ const (
FileAnnotationRefsPlaceholder = "(?, ?, ?, ?, ?, ?, ?, ?, ?)" FileAnnotationRefsPlaceholder = "(?, ?, ?, ?, ?, ?, ?, ?, ?)"
) )
func insertBlocks(tx *sql.Tx, blocks []*Block, context map[string]interface{}) (err error) { func insertBlocks(tx *sql.Tx, blocks []*Block, current, total int, context map[string]interface{}) (err error) {
if 1 > len(blocks) { if 1 > len(blocks) {
return return
} }
@ -63,20 +63,20 @@ func insertBlocks(tx *sql.Tx, blocks []*Block, context map[string]interface{}) (
continue continue
} }
if err = insertBlocks0(tx, bulk, context); nil != err { if err = insertBlocks0(tx, bulk, current, total, context); nil != err {
return return
} }
bulk = []*Block{} bulk = []*Block{}
} }
if 0 < len(bulk) { if 0 < len(bulk) {
if err = insertBlocks0(tx, bulk, context); nil != err { if err = insertBlocks0(tx, bulk, current, total, context); nil != err {
return return
} }
} }
return return
} }
func insertBlocks0(tx *sql.Tx, bulk []*Block, context map[string]interface{}) (err error) { func insertBlocks0(tx *sql.Tx, bulk []*Block, current, total int, context map[string]interface{}) (err error) {
valueStrings := make([]string, 0, len(bulk)) valueStrings := make([]string, 0, len(bulk))
valueArgs := make([]interface{}, 0, len(bulk)*strings.Count(BlocksPlaceholder, "?")) valueArgs := make([]interface{}, 0, len(bulk)*strings.Count(BlocksPlaceholder, "?"))
hashBuf := bytes.Buffer{} hashBuf := bytes.Buffer{}
@ -114,7 +114,7 @@ func insertBlocks0(tx *sql.Tx, bulk []*Block, context map[string]interface{}) (e
} }
hashBuf.WriteString("blocks") hashBuf.WriteString("blocks")
evtHash := fmt.Sprintf("%x", sha256.Sum256(hashBuf.Bytes()))[:7] evtHash := fmt.Sprintf("%x", sha256.Sum256(hashBuf.Bytes()))[:7]
eventbus.Publish(eventbus.EvtSQLInsertBlocks, context, len(bulk), evtHash) //eventbus.Publish(eventbus.EvtSQLInsertBlocks, context, current, total, len(bulk), evtHash)
stmt = fmt.Sprintf(BlocksFTSInsert, strings.Join(valueStrings, ",")) stmt = fmt.Sprintf(BlocksFTSInsert, strings.Join(valueStrings, ","))
if err = prepareExecInsertTx(tx, stmt, valueArgs); nil != err { if err = prepareExecInsertTx(tx, stmt, valueArgs); nil != err {
@ -129,7 +129,7 @@ func insertBlocks0(tx *sql.Tx, bulk []*Block, context map[string]interface{}) (e
} }
hashBuf.WriteString("fts") hashBuf.WriteString("fts")
evtHash = fmt.Sprintf("%x", sha256.Sum256(hashBuf.Bytes()))[:7] evtHash = fmt.Sprintf("%x", sha256.Sum256(hashBuf.Bytes()))[:7]
eventbus.Publish(eventbus.EvtSQLInsertBlocksFTS, context, len(bulk), evtHash) eventbus.Publish(eventbus.EvtSQLInsertBlocksFTS, context, current, total, len(bulk), evtHash)
return return
} }
@ -393,24 +393,24 @@ func insertRefs(tx *sql.Tx, tree *parse.Tree) (err error) {
return err return err
} }
func indexTree(tx *sql.Tx, box, p string, context map[string]interface{}) (err error) { func indexTree(tx *sql.Tx, box, p string, current, total int, context map[string]interface{}) (err error) {
tree, err := filesys.LoadTree(box, p, luteEngine) tree, err := filesys.LoadTree(box, p, luteEngine)
if nil != err { if nil != err {
return return
} }
err = insertTree(tx, tree, context) err = insertTree(tx, tree, current, total, context)
return return
} }
func insertTree(tx *sql.Tx, tree *parse.Tree, context map[string]interface{}) (err error) { func insertTree(tx *sql.Tx, tree *parse.Tree, current, total int, context map[string]interface{}) (err error) {
blocks, spans, assets, attributes := fromTree(tree.Root, tree) blocks, spans, assets, attributes := fromTree(tree.Root, tree)
refs, fileAnnotationRefs := refsFromTree(tree) refs, fileAnnotationRefs := refsFromTree(tree)
err = insertTree0(tx, tree, context, blocks, spans, assets, attributes, refs, fileAnnotationRefs) err = insertTree0(tx, tree, current, total, context, blocks, spans, assets, attributes, refs, fileAnnotationRefs)
return return
} }
func upsertTree(tx *sql.Tx, tree *parse.Tree, context map[string]interface{}) (err error) { func upsertTree(tx *sql.Tx, tree *parse.Tree, current, total int, context map[string]interface{}) (err error) {
oldBlockHashes := queryBlockHashes(tree.ID) oldBlockHashes := queryBlockHashes(tree.ID)
blocks, spans, assets, attributes := fromTree(tree.Root, tree) blocks, spans, assets, attributes := fromTree(tree.Root, tree)
newBlockHashes := map[string]string{} newBlockHashes := map[string]string{}
@ -460,16 +460,16 @@ func upsertTree(tx *sql.Tx, tree *parse.Tree, context map[string]interface{}) (e
} }
refs, fileAnnotationRefs := refsFromTree(tree) refs, fileAnnotationRefs := refsFromTree(tree)
if err = insertTree0(tx, tree, context, blocks, spans, assets, attributes, refs, fileAnnotationRefs); nil != err { if err = insertTree0(tx, tree, current, total, context, blocks, spans, assets, attributes, refs, fileAnnotationRefs); nil != err {
return return
} }
return err return err
} }
func insertTree0(tx *sql.Tx, tree *parse.Tree, context map[string]interface{}, func insertTree0(tx *sql.Tx, tree *parse.Tree, current, total int, context map[string]interface{},
blocks []*Block, spans []*Span, assets []*Asset, attributes []*Attribute, blocks []*Block, spans []*Span, assets []*Asset, attributes []*Attribute,
refs []*Ref, fileAnnotationRefs []*FileAnnotationRef) (err error) { refs []*Ref, fileAnnotationRefs []*FileAnnotationRef) (err error) {
if err = insertBlocks(tx, blocks, context); nil != err { if err = insertBlocks(tx, blocks, current, total, context); nil != err {
return return
} }