split out thread functionality from ZipOutputEntry

It can be easily separated out, it looked like hacked in. And
I will need to do more refactoring of the class, so this shouldn't
be more complex than necessary.

Change-Id: I302da55409e9195274907ca4939c37fbb2427b18
Reviewed-on: https://gerrit.libreoffice.org/73031
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
diff --git a/include/package/Deflater.hxx b/include/package/Deflater.hxx
index 1fbff8c..3cd528b 100644
--- a/include/package/Deflater.hxx
+++ b/include/package/Deflater.hxx
@@ -45,12 +45,12 @@ public:
    ~Deflater();
    Deflater(sal_Int32 nSetLevel, bool bNowrap);
    void setInputSegment( const css::uno::Sequence< sal_Int8 >& rBuffer );
    bool needsInput(  );
    bool needsInput() const;
    void finish(  );
    bool finished(  ) { return bFinished;}
    bool finished() const { return bFinished;}
    sal_Int32 doDeflateSegment( css::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewLength );
    sal_Int64 getTotalIn(  );
    sal_Int64 getTotalOut(  );
    sal_Int64 getTotalIn() const;
    sal_Int64 getTotalOut() const;
    void reset(  );
    void end(  );
};
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx
index c35da58..af6528f 100644
--- a/package/inc/ZipOutputEntry.hxx
+++ b/package/inc/ZipOutputEntry.hxx
@@ -27,6 +27,7 @@
#include <com/sun/star/xml/crypto/XDigestContext.hpp>

#include <package/Deflater.hxx>
#include <comphelper/threadpool.hxx>
#include "CRC32.hxx"
#include <atomic>

@@ -36,25 +37,20 @@ class ZipPackageStream;

class ZipOutputEntry
{
    // allow only DeflateThreadTask to change m_bFinished using setFinished()
    friend class DeflateThreadTask;

protected:
    css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
    ZipUtils::Deflater m_aDeflater;
    css::uno::Reference< css::uno::XComponentContext > m_xContext;
    OUString m_aTempURL;
    css::uno::Reference< css::io::XOutputStream > m_xOutStream;

    css::uno::Reference< css::xml::crypto::XCipherContext > m_xCipherContext;
    css::uno::Reference< css::xml::crypto::XDigestContext > m_xDigestContext;
    std::exception_ptr m_aParallelDeflateException;

    CRC32               m_aCRC;
    ZipEntry            *m_pCurrentEntry;
    sal_Int16           m_nDigested;
    ZipPackageStream*   m_pCurrentStream;
    bool const          m_bEncryptCurrentEntry;
    std::atomic<bool>   m_bFinished;

public:
    ZipOutputEntry(
@@ -62,32 +58,49 @@ public:
        const css::uno::Reference< css::uno::XComponentContext >& rxContext,
        ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);

    ~ZipOutputEntry();
    ZipEntry* getZipEntry() { return m_pCurrentEntry; }
    ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
    bool isEncrypt() { return m_bEncryptCurrentEntry; }

    /* This block of methods is for threaded zipping, where we compress to a temp stream, whose
       data is retrieved via getData */
    void closeEntry();

    void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream);
    void write(const css::uno::Sequence< sal_Int8 >& rBuffer);

protected:
    ZipOutputEntry(
        const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
        const css::uno::Reference< css::uno::XComponentContext >& rxContext,
        ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream);
    void doDeflate();
};

// Class that runs the compression in a thread.
class ZipOutputEntryInThread : public ZipOutputEntry
{
    class Task;
    OUString m_aTempURL;
    std::exception_ptr m_aParallelDeflateException;
    std::atomic<bool>   m_bFinished;

public:
    ZipOutputEntryInThread(
        const css::uno::Reference< css::uno::XComponentContext >& rxContext,
        ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
    std::unique_ptr<comphelper::ThreadTask> createTask(
        const std::shared_ptr<comphelper::ThreadTaskTag>& pTag,
        const css::uno::Reference< css::io::XInputStream >& xInStream );
    /* This block of methods is for threaded zipping, where we compress to a temp stream, whose
       data is retrieved via getData */
    void createBufferFile();
    void setParallelDeflateException(const std::exception_ptr& exception) { m_aParallelDeflateException = exception; }
    css::uno::Reference< css::io::XInputStream > getData() const;
    const std::exception_ptr& getParallelDeflateException() const { return m_aParallelDeflateException; }
    void closeBufferFile();
    void deleteBufferFile();

    ZipEntry* getZipEntry() { return m_pCurrentEntry; }
    ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
    bool isEncrypt() { return m_bEncryptCurrentEntry; }

    void closeEntry();
    void write(const css::uno::Sequence< sal_Int8 >& rBuffer);

    bool isFinished() const { return m_bFinished; }

private:
    void setFinished() { m_bFinished = true; }
    void doDeflate();
};

#endif
diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx
index ff7b66d..b527abd 100644
--- a/package/inc/ZipOutputStream.hxx
+++ b/package/inc/ZipOutputStream.hxx
@@ -29,6 +29,7 @@

struct ZipEntry;
class ZipOutputEntry;
class ZipOutputEntryInThread;
class ZipPackageStream;

class ZipOutputStream
@@ -39,7 +40,7 @@ class ZipOutputStream

    ByteChucker         m_aChucker;
    ZipEntry            *m_pCurrentEntry;
    std::vector< ZipOutputEntry* > m_aEntries;
    std::vector< ZipOutputEntryInThread* > m_aEntries;
    std::exception_ptr m_aDeflateException;

public:
@@ -47,7 +48,7 @@ public:
        const css::uno::Reference< css::io::XOutputStream > &xOStream );
    ~ZipOutputStream();

    void addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
    void addDeflatingThreadTask( ZipOutputEntryInThread *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );

    /// @throws css::io::IOException
    /// @throws css::uno::RuntimeException
@@ -79,7 +80,7 @@ private:
    void writeEXT( const ZipEntry &rEntry );

    // ScheduledThread handling helpers
    void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate);
    void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread> pCandidate);
    void consumeFinishedScheduledThreadTaskEntries();

public:
diff --git a/package/source/zipapi/Deflater.cxx b/package/source/zipapi/Deflater.cxx
index 8c02c4f..eacbbc9 100644
--- a/package/source/zipapi/Deflater.cxx
+++ b/package/source/zipapi/Deflater.cxx
@@ -100,7 +100,7 @@ void Deflater::setInputSegment( const uno::Sequence< sal_Int8 >& rBuffer )
    nLength = rBuffer.getLength();
}

bool Deflater::needsInput(  )
bool Deflater::needsInput() const
{
    return nLength <=0;
}
@@ -113,11 +113,11 @@ sal_Int32 Deflater::doDeflateSegment( uno::Sequence< sal_Int8 >& rBuffer, sal_In
    OSL_ASSERT( !(nNewLength < 0 || nNewLength > rBuffer.getLength()));
    return doDeflateBytes(rBuffer, /*nNewOffset*/0, nNewLength);
}
sal_Int64 Deflater::getTotalIn(  )
sal_Int64 Deflater::getTotalIn() const
{
    return pStream->total_in; // FIXME64: zlib doesn't look 64bit clean here
}
sal_Int64 Deflater::getTotalOut(  )
sal_Int64 Deflater::getTotalOut() const
{
    return pStream->total_out; // FIXME64: zlib doesn't look 64bit clean here
}
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 2b1447b..74281fd 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -46,7 +46,8 @@ ZipOutputEntry::ZipOutputEntry(
        const uno::Reference< uno::XComponentContext >& rxContext,
        ZipEntry& rEntry,
        ZipPackageStream* pStream,
        bool bEncrypt)
        bool bEncrypt,
        bool checkStream)
: m_aDeflateBuffer(n_ConstBufferSize)
, m_aDeflater(DEFAULT_COMPRESSION, true)
, m_xContext(rxContext)
@@ -55,10 +56,10 @@ ZipOutputEntry::ZipOutputEntry(
, m_nDigested(0)
, m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
, m_bFinished(false)
{
    assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
    assert(m_xOutStream.is());
    (void)checkStream;
    assert(!checkStream || m_xOutStream.is());
    if (m_bEncryptCurrentEntry)
    {
        m_xCipherContext = ZipFile::StaticGetCipher( m_xContext, pStream->GetEncryptionData(), true );
@@ -67,64 +68,13 @@ ZipOutputEntry::ZipOutputEntry(
}

ZipOutputEntry::ZipOutputEntry(
        const css::uno::Reference< css::io::XOutputStream >& rxOutput,
        const uno::Reference< uno::XComponentContext >& rxContext,
        ZipEntry& rEntry,
        ZipPackageStream* pStream,
        bool bEncrypt)
: m_aDeflateBuffer(n_ConstBufferSize)
, m_aDeflater(DEFAULT_COMPRESSION, true)
, m_xContext(rxContext)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
, m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
, m_bFinished(false)
: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
{
    assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
    if (m_bEncryptCurrentEntry)
    {
        m_xCipherContext = ZipFile::StaticGetCipher( m_xContext, pStream->GetEncryptionData(), true );
        m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( m_xContext, pStream->GetEncryptionData() );
    }
}

ZipOutputEntry::~ZipOutputEntry()
{
}

void ZipOutputEntry::createBufferFile()
{
    assert(!m_xOutStream.is() && m_aTempURL.isEmpty() &&
           "should only be called in the threaded mode where there is no existing stream yet");
    uno::Reference < beans::XPropertySet > xTempFileProps(
            io::TempFile::create(m_xContext),
            uno::UNO_QUERY_THROW );
    xTempFileProps->setPropertyValue("RemoveFile", uno::makeAny(false));
    uno::Any aUrl = xTempFileProps->getPropertyValue( "Uri" );
    aUrl >>= m_aTempURL;
    assert(!m_aTempURL.isEmpty());

    uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
    m_xOutStream = xTempAccess->openFileWrite(m_aTempURL);
}

void ZipOutputEntry::closeBufferFile()
{
    m_xOutStream->closeOutput();
    m_xOutStream.clear();
}

void ZipOutputEntry::deleteBufferFile()
{
    assert(!m_xOutStream.is() && !m_aTempURL.isEmpty());
    uno::Reference < ucb::XSimpleFileAccess3 > xAccess(ucb::SimpleFileAccess::create(m_xContext));
    xAccess->kill(m_aTempURL);
}

uno::Reference< io::XInputStream > ZipOutputEntry::getData() const
{
    uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
    return xTempAccess->openFileRead(m_aTempURL);
}

void ZipOutputEntry::closeEntry()
@@ -241,4 +191,114 @@ void ZipOutputEntry::doDeflate()
    }
}

ZipOutputEntryInThread::ZipOutputEntryInThread(
        const uno::Reference< uno::XComponentContext >& rxContext,
        ZipEntry& rEntry,
        ZipPackageStream* pStream,
        bool bEncrypt)
: ZipOutputEntry( uno::Reference< css::io::XOutputStream >(), rxContext, rEntry, pStream, bEncrypt, false )
, m_bFinished(false)
{
}

void ZipOutputEntryInThread::createBufferFile()
{
    assert(!m_xOutStream.is() && m_aTempURL.isEmpty() &&
           "should only be called in the threaded mode where there is no existing stream yet");
    uno::Reference < beans::XPropertySet > xTempFileProps(
            io::TempFile::create(m_xContext),
            uno::UNO_QUERY_THROW );
    xTempFileProps->setPropertyValue("RemoveFile", uno::makeAny(false));
    uno::Any aUrl = xTempFileProps->getPropertyValue( "Uri" );
    aUrl >>= m_aTempURL;
    assert(!m_aTempURL.isEmpty());

    uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
    m_xOutStream = xTempAccess->openFileWrite(m_aTempURL);
}

void ZipOutputEntryInThread::closeBufferFile()
{
    m_xOutStream->closeOutput();
    m_xOutStream.clear();
}

void ZipOutputEntryInThread::deleteBufferFile()
{
    assert(!m_xOutStream.is() && !m_aTempURL.isEmpty());
    uno::Reference < ucb::XSimpleFileAccess3 > xAccess(ucb::SimpleFileAccess::create(m_xContext));
    xAccess->kill(m_aTempURL);
}

uno::Reference< io::XInputStream > ZipOutputEntryInThread::getData() const
{
    uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
    return xTempAccess->openFileRead(m_aTempURL);
}

class ZipOutputEntryInThread::Task : public comphelper::ThreadTask
{
    ZipOutputEntryInThread *mpEntry;
    uno::Reference< io::XInputStream > mxInStream;

public:
    Task( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntryInThread *pEntry,
          const uno::Reference< io::XInputStream >& xInStream )
        : comphelper::ThreadTask(pTag)
        , mpEntry(pEntry)
        , mxInStream(xInStream)
    {}

private:
    virtual void doWork() override
    {
        try
        {
            mpEntry->createBufferFile();
            mpEntry->writeStream(mxInStream);
            mxInStream.clear();
            mpEntry->closeBufferFile();
            mpEntry->setFinished();
        }
        catch (...)
        {
            mpEntry->setParallelDeflateException(std::current_exception());
            try
            {
                if (mpEntry->m_xOutStream.is())
                    mpEntry->closeBufferFile();
                if (!mpEntry->m_aTempURL.isEmpty())
                    mpEntry->deleteBufferFile();
            }
            catch (uno::Exception const&)
            {
            }
            mpEntry->setFinished();
        }
    }
};

std::unique_ptr<comphelper::ThreadTask> ZipOutputEntryInThread::createTask(
    const std::shared_ptr<comphelper::ThreadTaskTag>& pTag,
    const uno::Reference< io::XInputStream >& xInStream )
{
    return std::make_unique<Task>(pTag, this, xInStream);
}

void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInStream)
{
    sal_Int32 nLength = 0;
    uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
    do
    {
        nLength = xInStream->readBytes(aSeq, n_ConstBufferSize);
        if (nLength != n_ConstBufferSize)
            aSeq.realloc(nLength);

        write(aSeq);
    }
    while (nLength == n_ConstBufferSize);
    closeEntry();
}

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index 5d90224..8ea040b 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -68,7 +68,7 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry )
    }
}

void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntryInThread *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
{
    comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pTask));
    m_aEntries.push_back(pEntry);
@@ -91,7 +91,7 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
    m_pCurrentEntry = nullptr;
}

void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate)
void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread> pCandidate)
{
    //Any exceptions thrown in the threads were caught and stored for now
    const std::exception_ptr& rCaughtException(pCandidate->getParallelDeflateException());
@@ -126,13 +126,13 @@ void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputE

void ZipOutputStream::consumeFinishedScheduledThreadTaskEntries()
{
    std::vector< ZipOutputEntry* > aNonFinishedEntries;
    std::vector< ZipOutputEntryInThread* > aNonFinishedEntries;

    for(ZipOutputEntry* pEntry : m_aEntries)
    for(ZipOutputEntryInThread* pEntry : m_aEntries)
    {
        if(pEntry->isFinished())
        {
            consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pEntry));
            consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread>(pEntry));
        }
        else
        {
@@ -167,9 +167,9 @@ void ZipOutputStream::finish()
    // consume all processed entries
    while(!m_aEntries.empty())
    {
        ZipOutputEntry* pCandidate = m_aEntries.back();
        ZipOutputEntryInThread* pCandidate = m_aEntries.back();
        m_aEntries.pop_back();
        consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pCandidate));
        consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread>(pCandidate));
    }

    sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index 5693681..e795776 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -17,7 +17,6 @@
 *   the License at http://www.apache.org/licenses/LICENSE-2.0 .
 */

#include <memory>
#include <ZipPackageStream.hxx>

#include <com/sun/star/beans/PropertyValue.hpp>
@@ -431,65 +430,6 @@ bool ZipPackageStream::ParsePackageRawStream()
    return true;
}

static void deflateZipEntry(ZipOutputEntry *pZipEntry,
        const uno::Reference< io::XInputStream >& xInStream)
{
    sal_Int32 nLength = 0;
    uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
    do
    {
        nLength = xInStream->readBytes(aSeq, n_ConstBufferSize);
        if (nLength != n_ConstBufferSize)
            aSeq.realloc(nLength);

        pZipEntry->write(aSeq);
    }
    while (nLength == n_ConstBufferSize);
    pZipEntry->closeEntry();
}

class DeflateThreadTask: public comphelper::ThreadTask
{
    ZipOutputEntry *mpEntry;
    uno::Reference< io::XInputStream > mxInStream;

public:
    DeflateThreadTask( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry,
                       const uno::Reference< io::XInputStream >& xInStream )
        : comphelper::ThreadTask(pTag)
        , mpEntry(pEntry)
        , mxInStream(xInStream)
    {}

private:
    virtual void doWork() override
    {
        try
        {
            mpEntry->createBufferFile();
            deflateZipEntry(mpEntry, mxInStream);
            mxInStream.clear();
            mpEntry->closeBufferFile();
            mpEntry->setFinished();
        }
        catch (...)
        {
            mpEntry->setParallelDeflateException(std::current_exception());
            try
            {
                if (mpEntry->m_xOutStream.is())
                    mpEntry->closeBufferFile();
                if (!mpEntry->m_aTempURL.isEmpty())
                    mpEntry->deleteBufferFile();
            }
            catch (uno::Exception const&)
            {
            }
            mpEntry->setFinished();
        }
    }
};

static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> const & rStream )
{
    // It's very annoying that we have to do this, but lots of zip packages
@@ -839,16 +779,16 @@ bool ZipPackageStream::saveChild(
                    rZipOut.reduceScheduledThreadTasksToGivenNumberOrLess(nAllowedTasks);

                    // Start a new thread task deflating this zip entry
                    ZipOutputEntry *pZipEntry = new ZipOutputEntry(
                    ZipOutputEntryInThread *pZipEntry = new ZipOutputEntryInThread(
                            m_xContext, *pTempEntry, this, bToBeEncrypted);
                    rZipOut.addDeflatingThreadTask( pZipEntry,
                            std::make_unique<DeflateThreadTask>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) );
                            pZipEntry->createTask( rZipOut.getThreadTaskTag(), xStream) );
                }
                else
                {
                    rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
                    ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
                    deflateZipEntry(&aZipEntry, xStream);
                    aZipEntry.writeStream(xStream);
                    rZipOut.rawCloseEntry(bToBeEncrypted);
                }
            }