feat(assets): improve PDF asset parser performance (#9051)

This commit will change the single-threaded behavior of
PDF parser into multi-threaded worker pool
speeding up PDF parsing into text

Co-authored-by: Heiko Besemann <heiko.besemann@qbeyond.de>
This commit is contained in:
nekrondev 2023-08-26 16:44:14 +02:00 committed by GitHub
parent 0beca364fa
commit f4e840fae6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -21,6 +21,7 @@ import (
"io/fs"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
@ -30,6 +31,7 @@ import (
"github.com/88250/gulu"
"github.com/88250/lute/ast"
"github.com/dustin/go-humanize"
"github.com/klippa-app/go-pdfium"
"github.com/klippa-app/go-pdfium/requests"
"github.com/klippa-app/go-pdfium/webassembly"
"github.com/siyuan-note/eventbus"
@ -676,8 +678,70 @@ func (parser *XlsxAssetParser) Parse(absPath string) (ret *AssetParseResult) {
type PdfAssetParser struct {
}
// Parse will parse a PDF document using PDFium webassembly module
// pdfPage struct defines a worker job for text extraction
type pdfPage struct {
pageNo int // page number for text extraction
data *[]byte // pointer to PDF document data
}
// pdfTextResult struct defines the extracted PDF text result
type pdfTextResult struct {
pageNo int // page number of PDF document
text string // text of converted page
err error // processing error
}
// getTextPageWorker will extract the text from a given PDF page and return its result
func (parser *PdfAssetParser) getTextPageWorker(id int, instance pdfium.Pdfium, page <-chan *pdfPage, result chan<- *pdfTextResult) {
defer instance.Close()
for pd := range page {
doc, err := instance.OpenDocument(&requests.OpenDocument{
File: pd.data,
})
if err != nil {
instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{
Document: doc.Document,
})
result <- &pdfTextResult{
pageNo: pd.pageNo,
err: err,
}
continue
}
req := &requests.GetPageText{
Page: requests.Page{
ByIndex: &requests.PageByIndex{
Document: doc.Document,
Index: pd.pageNo,
},
},
}
res, err := instance.GetPageText(req)
if err != nil {
instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{
Document: doc.Document,
})
result <- &pdfTextResult{
pageNo: pd.pageNo,
err: err,
}
continue
}
instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{
Document: doc.Document,
})
result <- &pdfTextResult{
pageNo: pd.pageNo,
text: res.Text,
err: nil,
}
}
}
// Parse will parse a PDF document using PDFium webassembly module using a worker pool
func (parser *PdfAssetParser) Parse(absPath string) (ret *AssetParseResult) {
st := time.Now()
if !strings.HasSuffix(strings.ToLower(absPath), ".pdf") {
return
}
@ -692,24 +756,20 @@ func (parser *PdfAssetParser) Parse(absPath string) (ret *AssetParseResult) {
}
defer os.RemoveAll(tmp)
f, err := os.Open(tmp)
if nil != err {
logging.LogErrorf("open [%s] failed: [%s]", tmp, err)
return
}
defer f.Close()
stat, err := f.Stat()
// PDF blob will be processed in-memory making sharing of PDF document data across worker goroutines possible
pdfData, err := os.ReadFile(tmp)
if nil != err {
logging.LogErrorf("open [%s] failed: [%s]", tmp, err)
return
}
// initialize pdfium with one worker
// initialize go-pdfium with number of available cores
// we fire up the complete worker pool for maximum performance
cores := runtime.NumCPU()
pool, err := webassembly.Init(webassembly.Config{
MinIdle: 1,
MaxIdle: 1,
MaxTotal: 1,
MinIdle: cores,
MaxIdle: cores,
MaxTotal: cores,
})
if err != nil {
logging.LogErrorf("convert [%s] failed: [%s]", tmp, err)
@ -717,50 +777,73 @@ func (parser *PdfAssetParser) Parse(absPath string) (ret *AssetParseResult) {
}
defer pool.Close()
// first get the number of PDF pages to convert into text
instance, err := pool.GetInstance(time.Second * 30)
if err != nil {
logging.LogErrorf("convert [%s] failed: [%s]", tmp, err)
return
}
defer instance.Close()
// get number of pages inside PDF document
doc, err := instance.OpenDocument(&requests.OpenDocument{
FileReader: f,
FileReaderSize: stat.Size(),
File: &pdfData,
})
if err != nil {
instance.Close()
logging.LogErrorf("convert [%s] failed: [%s]", tmp, err)
return
}
defer instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{
Document: doc.Document,
})
pc, err := instance.FPDF_GetPageCount(&requests.FPDF_GetPageCount{Document: doc.Document})
if err != nil {
instance.FPDF_CloseDocument(&requests.FPDF_CloseDocument{
Document: doc.Document,
})
instance.Close()
logging.LogErrorf("convert [%s] failed: [%s]", tmp, err)
return
}
instance.Close()
pageCount, err := instance.FPDF_GetPageCount(&requests.FPDF_GetPageCount{Document: doc.Document})
if err != nil {
logging.LogErrorf("convert [%s] failed: [%s]", tmp, err)
return
}
// loop through pages and get content
content := ""
for page := 0; page < pageCount.PageCount; page++ {
req := &requests.GetPageText{
Page: requests.Page{
ByIndex: &requests.PageByIndex{
Document: doc.Document,
Index: page,
},
},
}
pt, err := instance.GetPageText(req)
// next setup worker pool for processing PDF pages
pages := make(chan *pdfPage, pc.PageCount)
results := make(chan *pdfTextResult, pc.PageCount)
for i := 0; i < cores; i++ {
inst, err := pool.GetInstance(time.Second * 30)
if err != nil {
close(pages)
close(results)
logging.LogErrorf("convert [%s] failed: [%s]", tmp, err)
return
}
content += " " + normalizeNonTxtAssetContent(pt.Text)
go parser.getTextPageWorker(i, inst, pages, results)
}
// now split pages and let them process by worker pool
for p := 0; p < pc.PageCount; p++ {
pages <- &pdfPage{
pageNo: p,
data: &pdfData,
}
}
close(pages)
// finally fetch the PDF page text results
// Note: some workers will process pages faster than other workers depending on the page contents
// the order of returned PDF text pages is random and must be sorted using the pageNo index
pagetext := make([]string, pc.PageCount)
for p := 0; p < pc.PageCount; p++ {
res := <-results
pagetext[res.pageNo] = res.text
if res.err != nil {
logging.LogErrorf("convert [%s] of page %d failed: [%s]", tmp, res.pageNo, err)
}
}
close(results)
logging.LogInfof("convert [%s] PDF with %d pages using %d workers took %s.\n", tmp, pc.PageCount, cores, time.Since(st))
// loop through ordered PDF text pages and join content for asset parse DB result
content := ""
for _, pt := range pagetext {
content += " " + normalizeNonTxtAssetContent(pt)
}
ret = &AssetParseResult{
Content: content,
}