From f4e840fae6c46cab8a03c89249f6d59913005750 Mon Sep 17 00:00:00 2001 From: nekrondev Date: Sat, 26 Aug 2023 16:44:14 +0200 Subject: [PATCH] feat(assets): improve PDF asset parser performance (#9051) This commit will change the single-threaded behavior of PDF parser into multi-threaded worker pool speeding up PDF parsing into text Co-authored-by: Heiko Besemann --- kernel/model/asset_content.go | 161 ++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 39 deletions(-) diff --git a/kernel/model/asset_content.go b/kernel/model/asset_content.go index 9ee5f9a08..2ba193b8e 100644 --- a/kernel/model/asset_content.go +++ b/kernel/model/asset_content.go @@ -21,6 +21,7 @@ import ( "io/fs" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -30,6 +31,7 @@ import ( "github.com/88250/gulu" "github.com/88250/lute/ast" "github.com/dustin/go-humanize" + "github.com/klippa-app/go-pdfium" "github.com/klippa-app/go-pdfium/requests" "github.com/klippa-app/go-pdfium/webassembly" "github.com/siyuan-note/eventbus" @@ -676,8 +678,70 @@ func (parser *XlsxAssetParser) Parse(absPath string) (ret *AssetParseResult) { type PdfAssetParser struct { } -// Parse will parse a PDF document using PDFium webassembly module +// pdfPage struct defines a worker job for text extraction +type pdfPage struct { + pageNo int // page number for text extraction + data *[]byte // pointer to PDF document data +} + +// pdfTextResult struct defines the extracted PDF text result +type pdfTextResult struct { + pageNo int // page number of PDF document + text string // text of converted page + err error // processing error +} + +// getTextPageWorker will extract the text from a given PDF page and return its result +func (parser *PdfAssetParser) getTextPageWorker(id int, instance pdfium.Pdfium, page <-chan *pdfPage, result chan<- *pdfTextResult) { + defer instance.Close() + for pd := range page { + doc, err := instance.OpenDocument(&requests.OpenDocument{ + File: pd.data, + }) + if err != nil { + instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{ + Document: doc.Document, + }) + result <- &pdfTextResult{ + pageNo: pd.pageNo, + err: err, + } + continue + } + + req := &requests.GetPageText{ + Page: requests.Page{ + ByIndex: &requests.PageByIndex{ + Document: doc.Document, + Index: pd.pageNo, + }, + }, + } + res, err := instance.GetPageText(req) + if err != nil { + instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{ + Document: doc.Document, + }) + result <- &pdfTextResult{ + pageNo: pd.pageNo, + err: err, + } + continue + } + instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{ + Document: doc.Document, + }) + result <- &pdfTextResult{ + pageNo: pd.pageNo, + text: res.Text, + err: nil, + } + } +} + +// Parse will parse a PDF document using PDFium webassembly module using a worker pool func (parser *PdfAssetParser) Parse(absPath string) (ret *AssetParseResult) { + st := time.Now() if !strings.HasSuffix(strings.ToLower(absPath), ".pdf") { return } @@ -692,24 +756,20 @@ func (parser *PdfAssetParser) Parse(absPath string) (ret *AssetParseResult) { } defer os.RemoveAll(tmp) - f, err := os.Open(tmp) - if nil != err { - logging.LogErrorf("open [%s] failed: [%s]", tmp, err) - return - } - defer f.Close() - - stat, err := f.Stat() + // PDF blob will be processed in-memory making sharing of PDF document data across worker goroutines possible + pdfData, err := os.ReadFile(tmp) if nil != err { logging.LogErrorf("open [%s] failed: [%s]", tmp, err) return } - // initialize pdfium with one worker + // initialize go-pdfium with number of available cores + // we fire up the complete worker pool for maximum performance + cores := runtime.NumCPU() pool, err := webassembly.Init(webassembly.Config{ - MinIdle: 1, - MaxIdle: 1, - MaxTotal: 1, + MinIdle: cores, + MaxIdle: cores, + MaxTotal: cores, }) if err != nil { logging.LogErrorf("convert [%s] failed: [%s]", tmp, err) @@ -717,50 +777,73 @@ func (parser *PdfAssetParser) Parse(absPath string) (ret *AssetParseResult) { } defer pool.Close() + // first get the number of PDF pages to convert into text instance, err := pool.GetInstance(time.Second * 30) if err != nil { logging.LogErrorf("convert [%s] failed: [%s]", tmp, err) return } - defer instance.Close() - - // get number of pages inside PDF document doc, err := instance.OpenDocument(&requests.OpenDocument{ - FileReader: f, - FileReaderSize: stat.Size(), + File: &pdfData, }) if err != nil { + instance.Close() logging.LogErrorf("convert [%s] failed: [%s]", tmp, err) return } - defer instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{ - Document: doc.Document, - }) + pc, err := instance.FPDF_GetPageCount(&requests.FPDF_GetPageCount{Document: doc.Document}) + if err != nil { + instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{ + Document: doc.Document, + }) + instance.Close() + logging.LogErrorf("convert [%s] failed: [%s]", tmp, err) + return + } + instance.Close() - pageCount, err := instance.FPDF_GetPageCount(&requests.FPDF_GetPageCount{Document: doc.Document}) - if err != nil { - logging.LogErrorf("convert [%s] failed: [%s]", tmp, err) - return - } - // loop through pages and get content - content := "" - for page := 0; page < pageCount.PageCount; page++ { - req := &requests.GetPageText{ - Page: requests.Page{ - ByIndex: &requests.PageByIndex{ - Document: doc.Document, - Index: page, - }, - }, - } - pt, err := instance.GetPageText(req) + // next setup worker pool for processing PDF pages + pages := make(chan *pdfPage, pc.PageCount) + results := make(chan *pdfTextResult, pc.PageCount) + for i := 0; i < cores; i++ { + inst, err := pool.GetInstance(time.Second * 30) if err != nil { + close(pages) + close(results) logging.LogErrorf("convert [%s] failed: [%s]", tmp, err) return } - content += " " + normalizeNonTxtAssetContent(pt.Text) + go parser.getTextPageWorker(i, inst, pages, results) } + // now split pages and let them process by worker pool + for p := 0; p < pc.PageCount; p++ { + pages <- &pdfPage{ + pageNo: p, + data: &pdfData, + } + } + close(pages) + + // finally fetch the PDF page text results + // Note: some workers will process pages faster than other workers depending on the page contents + // the order of returned PDF text pages is random and must be sorted using the pageNo index + pagetext := make([]string, pc.PageCount) + for p := 0; p < pc.PageCount; p++ { + res := <-results + pagetext[res.pageNo] = res.text + if res.err != nil { + logging.LogErrorf("convert [%s] of page %d failed: [%s]", tmp, res.pageNo, err) + } + } + close(results) + logging.LogInfof("convert [%s] PDF with %d pages using %d workers took %s.\n", tmp, pc.PageCount, cores, time.Since(st)) + + // loop through ordered PDF text pages and join content for asset parse DB result + content := "" + for _, pt := range pagetext { + content += " " + normalizeNonTxtAssetContent(pt) + } ret = &AssetParseResult{ Content: content, }