tdf#107945: prepare for future multi-threading of pivot cache...

It's disabled for now. We need to first make the edit engine and
a few other places thread-safe before we can parallelize this code.

Change-Id: Ie09536964ece42d43f505afc5e2611d469cc5c95
Reviewed-on: https://gerrit.libreoffice.org/38424
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Kohei Yoshida <libreoffice@kohei.us>
diff --git a/sc/CppunitTest_sc_ucalc.mk b/sc/CppunitTest_sc_ucalc.mk
index 5e91862..d918182 100644
--- a/sc/CppunitTest_sc_ucalc.mk
+++ b/sc/CppunitTest_sc_ucalc.mk
@@ -112,6 +112,12 @@ $(eval $(call gb_CppunitTest_add_libs,sc_ucalc,\
))
endif

ifeq ($(OS), $(filter LINUX %BSD SOLARIS, $(OS)))
$(eval $(call gb_CppunitTest_add_libs,sc_ucalc,\
    -lpthread \
))
endif

$(eval $(call gb_CppunitTest_use_configuration,sc_ucalc))

# vim: set noet sw=4 ts=4:
diff --git a/sc/Library_sc.mk b/sc/Library_sc.mk
index e72c615..5b7d2fc 100644
--- a/sc/Library_sc.mk
+++ b/sc/Library_sc.mk
@@ -696,6 +696,12 @@ $(eval $(call gb_Library_add_libs,sc,\
))
endif

ifeq ($(OS), $(filter LINUX %BSD SOLARIS, $(OS)))
$(eval $(call gb_Library_add_libs,sc,\
    -lpthread \
))
endif

$(eval $(call gb_SdiTarget_SdiTarget,sc/sdi/scslots,sc/sdi/scalc))

$(eval $(call gb_SdiTarget_set_include,sc/sdi/scslots,\
diff --git a/sc/inc/dpcache.hxx b/sc/inc/dpcache.hxx
index 515a3fa..3bac275 100644
--- a/sc/inc/dpcache.hxx
+++ b/sc/inc/dpcache.hxx
@@ -42,9 +42,9 @@ struct ScDPNumGroupInfo;
 */
class SC_DLLPUBLIC ScDPCache
{
    typedef std::unordered_set<OUString, OUStringHash> StringSetType;

public:
    typedef std::unordered_set<OUString, OUStringHash> StringSetType;
    typedef mdds::flat_segment_tree<SCROW, bool> EmptyRowsType;
    typedef std::vector<ScDPItemData> ScDPItemDataVec;
    typedef std::set<ScDPObject*> ScDPObjectSet;
    typedef std::vector<SCROW> IndexArrayType;
@@ -117,17 +117,17 @@ private:

    FieldsType maFields;
    GroupFieldsType maGroupFields;
    StringSetType maStringPool;
    std::vector<StringSetType> maStringPools; // one for each field.

    std::vector<OUString> maLabelNames; // Stores dimension names and the data layout dimension name at position 0.
    mdds::flat_segment_tree<SCROW, bool> maEmptyRows;
    EmptyRowsType maEmptyRows;
    SCROW mnDataSize;
    SCROW mnRowCount;

    bool mbDisposing;

public:
    rtl_uString* InternString( const OUString& rStr );
    rtl_uString* InternString( size_t nDim, const OUString& rStr );
    void AddReference(ScDPObject* pObj) const;
    void RemoveReference(ScDPObject* pObj) const;
    const ScDPObjectSet& GetAllReferences() const;
diff --git a/sc/source/core/data/dpcache.cxx b/sc/source/core/data/dpcache.cxx
index 0383108..2064d84 100644
--- a/sc/source/core/data/dpcache.cxx
+++ b/sc/source/core/data/dpcache.cxx
@@ -43,6 +43,17 @@
#include <com/sun/star/sheet/DataPilotFieldGroupBy.hpp>
#endif

// TODO : Threaded pivot cache operation is disabled until we can figure out
// ways to make the edit engine and number formatter codes thread-safe in a
// proper fashion.
#define ENABLE_THREADED_PIVOT_CACHE 0

#if ENABLE_THREADED_PIVOT_CACHE
#include <thread>
#include <future>
#include <queue>
#endif

using namespace ::com::sun::star;

using ::com::sun::star::uno::Exception;
@@ -106,6 +117,17 @@ private:
    ScDocument* mpDoc;
};

rtl_uString* internString( ScDPCache::StringSetType& rPool, const OUString& rStr )
{
    ScDPCache::StringSetType::iterator it = rPool.find(rStr);
    if (it != rPool.end())
        // In the pool.
        return (*it).pData;

    std::pair<ScDPCache::StringSetType::iterator, bool> r = rPool.insert(rStr);
    return r.second ? (*r.first).pData : nullptr;
}

OUString createLabelString( const ScDocument* pDoc, SCCOL nCol, const ScRefCellValue& rCell )
{
    OUString aDocStr = rCell.getRawString(pDoc);
@@ -125,16 +147,15 @@ OUString createLabelString( const ScDocument* pDoc, SCCOL nCol, const ScRefCellV
}

void initFromCell(
    ScDPCache& rCache, ScDocument* pDoc, const ScAddress& rPos,
    const ScRefCellValue& rCell,
    ScDPItemData& rData, sal_uInt32& rNumFormat)
    ScDPCache::StringSetType& rStrPool, ScDocument* pDoc, const ScAddress& rPos,
    const ScRefCellValue& rCell, ScDPItemData& rData, sal_uInt32& rNumFormat)
{
    OUString aDocStr = rCell.getRawString(pDoc);
    rNumFormat = 0;

    if (rCell.hasError())
    {
        rData.SetErrorStringInterned(rCache.InternString(aDocStr));
        rData.SetErrorStringInterned(internString(rStrPool, aDocStr));
    }
    else if (rCell.hasNumeric())
    {
@@ -144,7 +165,7 @@ void initFromCell(
    }
    else if (!rCell.isEmpty())
    {
        rData.SetStringInterned(rCache.InternString(aDocStr));
        rData.SetStringInterned(internString(rStrPool, aDocStr));
    }
    else
        rData.SetEmpty();
@@ -273,42 +294,249 @@ void processBuckets(std::vector<Bucket>& aBuckets, ScDPCache::Field& rField)
    std::for_each(itBeg, itUniqueEnd, PushBackValue(rField.maItems));
}

struct InitColumnData
{
    ScDPCache::EmptyRowsType maEmptyRows;
    OUString maLabel;

    ScDPCache::StringSetType* mpStrPool;
    ScDPCache::Field* mpField;

    SCCOL mnCol;

    InitColumnData() :
        maEmptyRows(0, MAXROWCOUNT, true),
        mpStrPool(nullptr),
        mpField(nullptr),
        mnCol(-1) {}

    void init( SCCOL nCol, ScDPCache::StringSetType* pStrPool, ScDPCache::Field* pField )
    {
        mpStrPool = pStrPool;
        mpField = pField;
        mnCol = nCol;
    }
};

struct InitDocData
{
    ScDocument* mpDoc;
    SCTAB mnDocTab;
    SCROW mnStartRow;
    SCROW mnEndRow;
    bool mbTailEmptyRows;

    InitDocData() :
        mpDoc(nullptr),
        mnDocTab(-1),
        mnStartRow(-1),
        mnEndRow(-1),
        mbTailEmptyRows(false) {}
};

typedef std::unordered_set<OUString, OUStringHash> LabelSet;

class InsertLabel : public std::unary_function<OUString, void>
{
    LabelSet& mrNames;
public:
    explicit InsertLabel(LabelSet& rNames) : mrNames(rNames) {}
    void operator() (const OUString& r)
    {
        mrNames.insert(r);
    }
};

std::vector<OUString> normalizeLabels( const std::vector<InitColumnData>& rColData )
{
    std::vector<OUString> aLabels(1u, ScGlobal::GetRscString(STR_PIVOT_DATA));

    LabelSet aExistingNames;

    for (const InitColumnData& rCol : rColData)
    {
        const OUString& rLabel = rCol.maLabel;
        sal_Int32 nSuffix = 1;
        OUString aNewLabel = rLabel;
        while (true)
        {
            if (!aExistingNames.count(aNewLabel))
            {
                // this is a unique label.
                aLabels.push_back(aNewLabel);
                aExistingNames.insert(aNewLabel);
                break;
            }

            // This name already exists.
            OUStringBuffer aBuf(rLabel);
            aBuf.append(++nSuffix);
            aNewLabel = aBuf.makeStringAndClear();
        }
    }

    return aLabels;
}

void initColumnFromDoc( InitDocData& rDocData, InitColumnData &rColData )
{
    ScDPCache::Field& rField = *rColData.mpField;
    ScDocument* pDoc = rDocData.mpDoc;
    SCTAB nDocTab = rDocData.mnDocTab;
    SCCOL nCol = rColData.mnCol;
    SCROW nStartRow = rDocData.mnStartRow;
    SCROW nEndRow = rDocData.mnEndRow;
    bool bTailEmptyRows = rDocData.mbTailEmptyRows;

    std::unique_ptr<sc::ColumnIterator> pIter =
        pDoc->GetColumnIterator(nDocTab, nCol, nStartRow, nEndRow);
    assert(pIter);
    assert(pIter->hasCell());

    ScDPItemData aData;

    rColData.maLabel = createLabelString(pDoc, nCol, pIter->getCell());
    pIter->next();

    std::vector<Bucket> aBuckets;
    aBuckets.reserve(nEndRow-nStartRow); // skip the topmost label cell.

    // Push back all original values.
    for (SCROW i = 0, n = nEndRow-nStartRow; i < n; ++i, pIter->next())
    {
        assert(pIter->hasCell());

        sal_uInt32 nNumFormat = 0;
        ScAddress aPos(nCol, pIter->getRow(), nDocTab);
        initFromCell(*rColData.mpStrPool, pDoc, aPos, pIter->getCell(), aData, nNumFormat);

        aBuckets.emplace_back(aData, i);

        if (!aData.IsEmpty())
        {
            rColData.maEmptyRows.insert_back(i, i+1, false);
            if (nNumFormat)
                // Only take non-default number format.
                rField.mnNumFormat = nNumFormat;
        }
    }

    processBuckets(aBuckets, rField);

    if (bTailEmptyRows)
    {
        // If the last item is not empty, append one. Note that the items
        // are sorted, and empty item should come last when sorted.
        if (rField.maItems.empty() || !rField.maItems.back().IsEmpty())
        {
            aData.SetEmpty();
            rField.maItems.push_back(aData);
        }
    }
}

#if ENABLE_THREADED_PIVOT_CACHE

class ThreadQueue
{
    using FutureType = std::future<void>;
    std::queue<FutureType> maQueue;
    std::mutex maMutex;
    std::condition_variable maCond;

    size_t mnMaxQueue;

public:
    ThreadQueue( size_t nMaxQueue ) : mnMaxQueue(nMaxQueue) {}

    void push( std::function<void()> aFunc )
    {
        std::unique_lock<std::mutex> lock(maMutex);

        while (maQueue.size() >= mnMaxQueue)
            maCond.wait(lock);

        FutureType f = std::async(std::launch::async, aFunc);
        maQueue.push(std::move(f));
        lock.unlock();

        maCond.notify_one();
    }

    void waitForOne()
    {
        std::unique_lock<std::mutex> lock(maMutex);

        while (maQueue.empty())
            maCond.wait(lock);

        FutureType ret = std::move(maQueue.front());
        maQueue.pop();
        lock.unlock();

        ret.get(); // This may throw if an exception was thrown on the async thread.

        maCond.notify_one();
    }
};

class ThreadScopedGuard
{
    std::thread maThread;
public:
    ThreadScopedGuard(std::thread thread) : maThread(std::move(thread)) {}
    ThreadScopedGuard(ThreadScopedGuard&& other) : maThread(std::move(other.maThread)) {}

    ThreadScopedGuard(const ThreadScopedGuard&) = delete;
    ThreadScopedGuard& operator= (const ThreadScopedGuard&) = delete;

    ~ThreadScopedGuard()
    {
        maThread.join();
    }
};

#endif

}

void ScDPCache::InitFromDoc(ScDocument* pDoc, const ScRange& rRange)
{
    Clear();

    InitDocData aDocData;
    aDocData.mpDoc = pDoc;

    // Make sure the formula cells within the data range are interpreted
    // during this call, for this method may be called from the interpretation
    // of GETPIVOTDATA, which disables nested formula interpretation without
    // increasing the macro level.
    MacroInterpretIncrementer aMacroInc(pDoc);

    SCROW nStartRow = rRange.aStart.Row();  // start of data
    SCROW nEndRow = rRange.aEnd.Row();
    aDocData.mnStartRow = rRange.aStart.Row();  // start of data
    aDocData.mnEndRow = rRange.aEnd.Row();

    // Sanity check
    if (!ValidRow(nStartRow) || !ValidRow(nEndRow) || nEndRow <= nStartRow)
    if (!ValidRow(aDocData.mnStartRow) || !ValidRow(aDocData.mnEndRow) || aDocData.mnEndRow <= aDocData.mnStartRow)
        return;

    sal_uInt16 nStartCol = rRange.aStart.Col();
    sal_uInt16 nEndCol = rRange.aEnd.Col();
    sal_uInt16 nDocTab = rRange.aStart.Tab();
    SCCOL nStartCol = rRange.aStart.Col();
    SCCOL nEndCol = rRange.aEnd.Col();
    aDocData.mnDocTab = rRange.aStart.Tab();

    mnColumnCount = nEndCol - nStartCol + 1;

    // this row count must include the trailing empty rows.
    mnRowCount = nEndRow - nStartRow; // skip the topmost label row.
    mnRowCount = aDocData.mnEndRow - aDocData.mnStartRow; // skip the topmost label row.

    // Skip trailing empty rows if exists.
    SCCOL nCol1 = nStartCol, nCol2 = nEndCol;
    SCROW nRow1 = nStartRow, nRow2 = nEndRow;
    pDoc->ShrinkToDataArea(nDocTab, nCol1, nRow1, nCol2, nRow2);
    bool bTailEmptyRows = nEndRow > nRow2; // Trailing empty rows exist.
    nEndRow = nRow2;
    SCROW nRow1 = aDocData.mnStartRow, nRow2 = aDocData.mnEndRow;
    pDoc->ShrinkToDataArea(aDocData.mnDocTab, nCol1, nRow1, nCol2, nRow2);
    aDocData.mbTailEmptyRows = aDocData.mnEndRow > nRow2; // Trailing empty rows exist.
    aDocData.mnEndRow = nRow2;

    if (nEndRow <= nStartRow)
    if (aDocData.mnEndRow <= aDocData.mnStartRow)
    {
        // Check this again since the end row position has changed. It's
        // possible that the new end row becomes lower than the start row
@@ -317,6 +545,8 @@ void ScDPCache::InitFromDoc(ScDocument* pDoc, const ScRange& rRange)
        return;
    }

    maStringPools.resize(mnColumnCount);
    std::vector<InitColumnData> aColData(mnColumnCount);
    maFields.reserve(mnColumnCount);
    for (size_t i = 0; i < static_cast<size_t>(mnColumnCount); ++i)
        maFields.push_back(o3tl::make_unique<Field>());
@@ -326,52 +556,61 @@ void ScDPCache::InitFromDoc(ScDocument* pDoc, const ScRange& rRange)
    // Ensure that none of the formula cells in the data range are dirty.
    pDoc->EnsureFormulaCellResults(rRange);

    ScDPItemData aData;
#if ENABLE_THREADED_PIVOT_CACHE
    ThreadQueue aQueue(std::thread::hardware_concurrency());

    auto aFuncLaunchFieldThreads = [&]()
    {
        for (sal_uInt16 nCol = nStartCol; nCol <= nEndCol; ++nCol)
        {
            size_t nDim = nCol - nStartCol;
            InitColumnData& rColData = aColData[nDim];
            rColData.init(nCol, &maStringPools[nDim], maFields[nDim].get());

            auto func = [&aDocData,&rColData]()
            {
                initColumnFromDoc(aDocData, rColData);
            };

            aQueue.push(std::move(func));
        }
    };

    {
        // Launch a separate thread that in turn spawns async threads to populate the fields.
        std::thread t(aFuncLaunchFieldThreads);
        ThreadScopedGuard sg(std::move(t));

        // Wait for all the async threads to complete on the main thread.
        for (SCCOL i = 0; i < mnColumnCount; ++i)
            aQueue.waitForOne();
    }

#else
    for (sal_uInt16 nCol = nStartCol; nCol <= nEndCol; ++nCol)
    {
        std::unique_ptr<sc::ColumnIterator> pIter =
            pDoc->GetColumnIterator(nDocTab, nCol, nStartRow, nEndRow);
        assert(pIter);
        assert(pIter->hasCell());
        size_t nDim = nCol - nStartCol;
        InitColumnData& rColData = aColData[nDim];
        rColData.init(nCol, &maStringPools[nDim], maFields[nDim].get());

        AddLabel(createLabelString(pDoc, nCol, pIter->getCell()));
        pIter->next();
        initColumnFromDoc(aDocData, rColData);
    }
#endif

        Field& rField = *maFields[nCol-nStartCol].get();
        std::vector<Bucket> aBuckets;
        aBuckets.reserve(nEndRow-nStartRow); // skip the topmost label cell.
    maLabelNames = normalizeLabels(aColData);

        // Push back all original values.
        for (SCROW i = 0, n = nEndRow-nStartRow; i < n; ++i, pIter->next())
    // Merge all non-empty rows data.
    for (const InitColumnData& rCol : aColData)
    {
        EmptyRowsType::const_segment_iterator it = rCol.maEmptyRows.begin_segment();
        EmptyRowsType::const_segment_iterator ite = rCol.maEmptyRows.end_segment();
        EmptyRowsType::const_iterator pos = maEmptyRows.begin();

        for (; it != ite; ++it)
        {
            assert(pIter->hasCell());

            sal_uInt32 nNumFormat = 0;
            ScAddress aPos(nCol, pIter->getRow(), nDocTab);
            initFromCell(*this, pDoc, aPos, pIter->getCell(), aData, nNumFormat);

            aBuckets.push_back(Bucket(aData, i));

            if (!aData.IsEmpty())
            {
                maEmptyRows.insert_back(i, i+1, false);
                if (nNumFormat)
                    // Only take non-default number format.
                    rField.mnNumFormat = nNumFormat;
            }
        }

        processBuckets(aBuckets, rField);

        if (bTailEmptyRows)
        {
            // If the last item is not empty, append one. Note that the items
            // are sorted, and empty item should come last when sorted.
            if (rField.maItems.empty() || !rField.maItems.back().IsEmpty())
            {
                aData.SetEmpty();
                rField.maItems.push_back(aData);
            }
            if (!it->value)
                // Non-empty segment found.  Record it.
                pos = maEmptyRows.insert(pos, it->start, it->end, false).first;
        }
    }

@@ -385,6 +624,7 @@ bool ScDPCache::InitFromDataBase(DBConnector& rDB)
    try
    {
        mnColumnCount = rDB.getColumnCount();
        maStringPools.resize(mnColumnCount);
        maFields.clear();
        maFields.reserve(mnColumnCount);
        for (size_t i = 0; i < static_cast<size_t>(mnColumnCount); ++i)
@@ -690,23 +930,6 @@ OUString ScDPCache::GetDimensionName(std::vector<OUString>::size_type nDim) cons
        return OUString();
}

namespace {

typedef std::unordered_set<OUString, OUStringHash> LabelSet;

class InsertLabel : public std::unary_function<OUString, void>
{
    LabelSet& mrNames;
public:
    explicit InsertLabel(LabelSet& rNames) : mrNames(rNames) {}
    void operator() (const OUString& r)
    {
        mrNames.insert(r);
    }
};

}

void ScDPCache::PostInit()
{
    OSL_ENSURE(!maFields.empty(), "Cache not initialized!");
@@ -734,7 +957,7 @@ void ScDPCache::Clear()
    maLabelNames.clear();
    maGroupFields.clear();
    maEmptyRows.clear();
    maStringPool.clear();
    maStringPools.clear();
}

void ScDPCache::AddLabel(const OUString& rLabel)
@@ -901,15 +1124,10 @@ SCCOL ScDPCache::GetDimensionIndex(const OUString& sName) const
    return -1;
}

rtl_uString* ScDPCache::InternString( const OUString& rStr )
rtl_uString* ScDPCache::InternString( size_t nDim, const OUString& rStr )
{
    StringSetType::iterator it = maStringPool.find(rStr);
    if (it != maStringPool.end())
        // In the pool.
        return (*it).pData;

    std::pair<StringSetType::iterator, bool> r = maStringPool.insert(rStr);
    return r.second ? (*r.first).pData : nullptr;
    assert(nDim < maStringPools.size());
    return internString(maStringPools[nDim], rStr);
}

void ScDPCache::AddReference(ScDPObject* pObj) const
diff --git a/sc/source/core/data/dpobject.cxx b/sc/source/core/data/dpobject.cxx
index 19dc7e8..8107428 100644
--- a/sc/source/core/data/dpobject.cxx
+++ b/sc/source/core/data/dpobject.cxx
@@ -247,7 +247,9 @@ void DBConnector::getValue(long nCol, ScDPItemData &rData, short& rNumType) cons
            case sdbc::DataType::VARBINARY:
            case sdbc::DataType::LONGVARBINARY:
            default:
                rData.SetStringInterned(mrCache.InternString(mxRow->getString(nCol+1)));
                // nCol is 0-based, and the left-most column always has nCol == 0.
                rData.SetStringInterned(
                    mrCache.InternString(nCol, mxRow->getString(nCol+1)));
        }
    }
    catch (uno::Exception&)