tdf#125662: do parallel-zip in batches

In this approach the input stream is read one batch (of constant size)
at a time and each batch is compressed by ThreadedDeflater. After
we are done with a batch, the deflated buffer is processed straightaway
(directed to file backed storage).

Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230
Reviewed-on: https://gerrit.libreoffice.org/c/core/+/86596
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx
index 3bd7e4b..f22a40a 100644
--- a/package/inc/ThreadedDeflater.hxx
+++ b/package/inc/ThreadedDeflater.hxx
@@ -21,37 +21,48 @@
#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX

#include <com/sun/star/uno/Sequence.hxx>
#include <com/sun/star/io/XInputStream.hpp>
#include <com/sun/star/uno/Reference.hxx>
#include <package/packagedllapi.hxx>
#include <comphelper/threadpool.hxx>
#include <atomic>
#include <memory>
#include <vector>
#include <functional>

namespace ZipUtils
{
/// Parallel compression a stream using the libz deflate algorithm.
///
/// Almost a replacement for the Deflater class. Call startDeflate() with the data,
/// check with finished() or waitForTasks() and retrieve result with getOutput().
/// The class will internally split into multiple threads.
/// Call deflateWrite() with the input stream and input/output processing functions.
/// This will use multiple threads for compression on each batch of data from the stream.
class ThreadedDeflater final
{
    class Task;
    // Note: All this should be lock-less. Each task writes only to its part
    // of the data, flags are atomic.
    // of the data.
    std::vector<std::vector<sal_Int8>> outBuffers;
    std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag;
    css::uno::Sequence<sal_Int8> inBuffer;
    css::uno::Sequence<sal_Int8> prevDataBlock;
    std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> maProcessOutputFunc;
    sal_Int64 totalIn;
    sal_Int64 totalOut;
    int zlibLevel;
    std::atomic<int> pendingTasksCount;

public:
    // Unlike with Deflater class, bNoWrap is always true.
    ThreadedDeflater(sal_Int32 nSetLevel);
    ~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE;
    void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer);
    void waitForTasks();
    bool finished() const;
    css::uno::Sequence<sal_Int8> getOutput() const;
    void deflateWrite(
        const css::uno::Reference<css::io::XInputStream>& xInStream,
        std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
        std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc);
    sal_Int64 getTotalIn() const { return totalIn; }
    sal_Int64 getTotalOut() const { return totalOut; }

private:
    void processDeflatedBuffers();
    void clear();
};

diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx
index 19bbda0..73725c5 100644
--- a/package/source/zipapi/ThreadedDeflater.cxx
+++ b/package/source/zipapi/ThreadedDeflater.cxx
@@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask
    ThreadedDeflater* deflater;
    int sequence;
    int blockSize;
    bool firstTask : 1;
    bool lastTask : 1;

public:
    Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
    Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_,
         bool lastTask_)
        : comphelper::ThreadTask(deflater_->threadTaskTag)
        , stream()
        , deflater(deflater_)
        , sequence(sequence_)
        , blockSize(blockSize_)
        , firstTask(firstTask_)
        , lastTask(lastTask_)
    {
    }

@@ -61,58 +66,83 @@ private:

ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
    : threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
    , totalIn(0)
    , totalOut(0)
    , zlibLevel(nSetLevel)
    , pendingTasksCount(0)
{
}

ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
{
    waitForTasks();
    clear();
}
ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); }

void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
void ThreadedDeflater::deflateWrite(
    const css::uno::Reference<css::io::XInputStream>& xInStream,
    std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
    std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc)
{
    inBuffer = rBuffer;
    sal_Int64 size = inBuffer.getLength();
    int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
    tasksCount = std::max(tasksCount, 1);
    pendingTasksCount = tasksCount;
    outBuffers.resize(pendingTasksCount);
    for (int sequence = 0; sequence < tasksCount; ++sequence)
    sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
    sal_Int64 batchSize = MaxBlockSize * nThreadCount;
    inBuffer.realloc(batchSize);
    prevDataBlock.realloc(MaxBlockSize);
    outBuffers.resize(nThreadCount);
    maProcessOutputFunc = aProcessOutputFunc;
    bool firstTask = true;

    while (xInStream->available() > 0)
    {
        sal_Int64 thisSize = std::min(MaxBlockSize, size);
        size -= thisSize;
        comphelper::ThreadPool::getSharedOptimalPool().pushTask(
            std::make_unique<Task>(this, sequence, thisSize));
        sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize);
        aProcessInputFunc(inBuffer, inputBytes);
        totalIn += inputBytes;
        int sequence = 0;
        bool lastBatch = xInStream->available() <= 0;
        sal_Int64 bytesPending = inputBytes;
        while (bytesPending > 0)
        {
            sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending);
            bytesPending -= taskSize;
            bool lastTask = lastBatch && !bytesPending;
            comphelper::ThreadPool::getSharedOptimalPool().pushTask(
                std::make_unique<Task>(this, sequence++, taskSize, firstTask, lastTask));

            if (firstTask)
                firstTask = false;
        }

        assert(bytesPending == 0);

        comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);

        if (!lastBatch)
        {
            assert(inputBytes == batchSize);
            std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize,
                        prevDataBlock.begin());
        }

        processDeflatedBuffers();
    }
    assert(size == 0);
}

bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }

css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
void ThreadedDeflater::processDeflatedBuffers()
{
    assert(finished());
    sal_Int64 totalSize = 0;
    sal_Int64 batchOutputSize = 0;
    for (const auto& buffer : outBuffers)
        totalSize += buffer.size();
    uno::Sequence<sal_Int8> outBuffer(totalSize);
        batchOutputSize += buffer.size();

    css::uno::Sequence<sal_Int8> outBuffer(batchOutputSize);

    auto pos = outBuffer.begin();
    for (const auto& buffer : outBuffers)
    for (auto& buffer : outBuffers)
    {
        pos = std::copy(buffer.begin(), buffer.end(), pos);
    return outBuffer;
}
        buffer.clear();
    }

void ThreadedDeflater::waitForTasks()
{
    comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
    maProcessOutputFunc(outBuffer, batchOutputSize);
    totalOut += batchOutputSize;
}

void ThreadedDeflater::clear()
{
    assert(finished());
    inBuffer = uno::Sequence<sal_Int8>();
    outBuffers.clear();
}
@@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork()
    // zlib doesn't handle const properly
    unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
        const_cast<signed char*>(deflater->inBuffer.getConstArray()));
    if (sequence != 0)
    if (!firstTask)
    {
        // the window size is 32k, so set last 32k of previous data as the dictionary
        assert(MAX_WBITS == 15);
        assert(MaxBlockSize >= 32768);
        deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
        if (sequence > 0)
        {
            deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
        }
        else
        {
            unsigned char* prevBufferPtr = reinterpret_cast<unsigned char*>(
                const_cast<signed char*>(deflater->prevDataBlock.getConstArray()));
            deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768);
        }
    }
    stream.next_in = inBufferPtr + myInBufferStart;
    stream.avail_in = blockSize;
    stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
    stream.avail_out = outputMaxSize;
    bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?

    // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
    // and since we use a raw stream, the data blocks then can be simply concatenated.
    int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
    int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH);
    assert(stream.avail_in == 0); // Check that everything has been deflated.
    if (last ? res == Z_STREAM_END : res == Z_OK)
    if (lastTask ? res == Z_STREAM_END : res == Z_OK)
    { // ok
        sal_Int64 outSize = outputMaxSize - stream.avail_out;
        deflater->outBuffers[sequence].resize(outSize);
        --deflater->pendingTasksCount;
    }
    else
    {
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index bee9d0a..f08e687 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel(

void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
{
    sal_Int64 toRead = xInStream->available();
    uno::Sequence< sal_Int8 > inBuffer( toRead );
    sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
    if (read < toRead)
        inBuffer.realloc( read );
    while( xInStream->available() > 0 )
    {   // We didn't get the full size from available().
        uno::Sequence< sal_Int8 > buf( xInStream->available());
        read = xInStream->readBytes( buf, xInStream->available());
        sal_Int64 oldSize = inBuffer.getLength();
        inBuffer.realloc( oldSize + read );
        std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
    }
    ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
    totalIn = inBuffer.getLength();
    deflater.startDeflate( inBuffer );
    processInput( inBuffer );
    deflater.waitForTasks();
    uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
    deflater.clear(); // release memory
    totalOut = outBuffer.getLength();
    processDeflated(outBuffer, outBuffer.getLength());
    deflater.deflateWrite(xInStream,
            [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
                if (!m_bEncryptCurrentEntry)
                    m_aCRC.updateSegment(rBuffer, nLen);
            },
            [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
                processDeflated(rBuffer, nLen);
            }
    );
    totalIn = deflater.getTotalIn();
    totalOut = deflater.getTotalOut();
    closeEntry();
}