First cut at moving unzipping into new thread:

XBufferedThreadedStream class buffers data in a new thread,
which will be available to be read from parent thread.

Change-Id: I62d367fa1dec23da39aba24b5c765b57707956bb
Reviewed-on: https://gerrit.libreoffice.org/38135
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
diff --git a/package/Library_package2.mk b/package/Library_package2.mk
index 3096a97..0ff715e 100644
--- a/package/Library_package2.mk
+++ b/package/Library_package2.mk
@@ -31,6 +31,7 @@ $(eval $(call gb_Library_use_libraries,package2,\
	sal \
	sax \
	ucbhelper \
	salhelper \
))

$(eval $(call gb_Library_use_externals,package2,\
@@ -51,6 +52,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\
	package/source/zipapi/Deflater \
	package/source/zipapi/Inflater \
	package/source/zipapi/sha1context \
	package/source/zipapi/XBufferedThreadedStream \
	package/source/zipapi/XUnbufferedStream \
	package/source/zipapi/ZipEnumeration \
	package/source/zipapi/ZipFile \
diff --git a/package/qa/cppunit/test_package.cxx b/package/qa/cppunit/test_package.cxx
index 335f490..0e1f477 100644
--- a/package/qa/cppunit/test_package.cxx
+++ b/package/qa/cppunit/test_package.cxx
@@ -27,19 +27,55 @@ namespace
    public:
        PackageTest() {}

        virtual void setUp() override;

        virtual bool load(const OUString &,
            const OUString &rURL, const OUString &,
            SfxFilterFlags, SotClipboardFormatId, unsigned int) override;

        void test();
        void testThreadedStreams();
        void testBufferedThreadedStreams();

        CPPUNIT_TEST_SUITE(PackageTest);
        CPPUNIT_TEST(test);
        CPPUNIT_TEST(testThreadedStreams);
        CPPUNIT_TEST(testBufferedThreadedStreams);
        CPPUNIT_TEST_SUITE_END();

    private:
        uno::Reference<container::XNameAccess> mxNA;
        void verifyStreams( std::vector<std::vector<char>> &aBuffers );
    };

    void PackageTest::setUp()
    {
        BootstrapFixtureBase::setUp();
        OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");

        uno::Sequence<beans::NamedValue> aNVs(2);
        aNVs[0].Name = "URL";
        aNVs[0].Value <<= aURL;
        aNVs[1].Name = "UseBufferedStream";
        aNVs[1].Value <<= true;

        uno::Sequence<uno::Any> aArgs(1);
        aArgs[0] <<= aNVs;

        uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
        uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();

        uno::Reference<packages::zip::XZipFileAccess2> xZip(
            xSvcMgr->createInstanceWithArgumentsAndContext(
                "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
            uno::UNO_QUERY);

        CPPUNIT_ASSERT(xZip.is());

        mxNA = uno::Reference<container::XNameAccess>(xZip, uno::UNO_QUERY);
        CPPUNIT_ASSERT(mxNA.is());
    }

    bool PackageTest::load(const OUString &,
        const OUString &rURL, const OUString &,
        SfxFilterFlags, SotClipboardFormatId, unsigned int)
@@ -62,6 +98,20 @@ namespace
            m_directories.getURLFromSrc("/package/qa/cppunit/data/"));
    }

    void PackageTest::verifyStreams( std::vector<std::vector<char>> &aBuffers )
    {
            CPPUNIT_ASSERT_EQUAL(size_t(26), aBuffers.size());
            auto itBuf = aBuffers.begin();

            for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
            {
                const std::vector<char>& rBuf = *itBuf;
                CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
                for (char check : rBuf)
                    CPPUNIT_ASSERT_EQUAL(c, check);
            }
    }

    // TODO : This test currently doesn't fail even when you set
    // UseBufferedStream to false. Look into this and replace it with a better
    // test that actually fails when the aforementioned flag is set to false.
@@ -95,30 +145,6 @@ namespace
            }
        };

        OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");

        uno::Sequence<beans::NamedValue> aNVs(2);
        aNVs[0].Name = "URL";
        aNVs[0].Value <<= aURL;
        aNVs[1].Name = "UseBufferedStream";
        aNVs[1].Value <<= true;

        uno::Sequence<uno::Any> aArgs(1);
        aArgs[0] <<= aNVs;

        uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
        uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();

        uno::Reference<packages::zip::XZipFileAccess2> xZip(
            xSvcMgr->createInstanceWithArgumentsAndContext(
                "com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
            uno::UNO_QUERY);

        CPPUNIT_ASSERT(xZip.is());

        uno::Reference<container::XNameAccess> xNA(xZip, uno::UNO_QUERY);
        CPPUNIT_ASSERT(xNA.is());

        {
            comphelper::ThreadPool aPool(4);
            std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
@@ -132,26 +158,50 @@ namespace
                aName += ".txt";

                uno::Reference<io::XInputStream> xStrm;
                xNA->getByName(aName) >>= xStrm;
                mxNA->getByName(aName) >>= xStrm;

                CPPUNIT_ASSERT(xStrm.is());
                aPool.pushTask(new Worker(pTag, xStrm, *itBuf));
            }

            aPool.waitUntilDone(pTag);
            verifyStreams( aTestBuffers );
        }
    }

            // Verify the streams.
            CPPUNIT_ASSERT_EQUAL(size_t(26), aTestBuffers.size());
            itBuf = aTestBuffers.begin();
    void PackageTest::testBufferedThreadedStreams()
    {
        std::vector<std::vector<char>> aTestBuffers(26);
        auto itBuf = aTestBuffers.begin();
        sal_Int32 nReadSize = 0;

            for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
        for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
        {
            OUString aName(c);
            aName += ".txt";

            uno::Reference<io::XInputStream> xStrm;
            //Size of each stream is 1mb (>10000) => XBufferedThreadedStream
            mxNA->getByName(aName) >>= xStrm;

            CPPUNIT_ASSERT(xStrm.is());
            sal_Int32 nSize = xStrm->available();

            uno::Sequence<sal_Int8> aBytes;
            //Read chuncks of increasing size
            nReadSize += 1024;

            while (nSize > 0)
            {
                const std::vector<char>& rBuf = *itBuf;
                CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
                for (char check : rBuf)
                    CPPUNIT_ASSERT_EQUAL(c, check);
                sal_Int32 nBytesRead = xStrm->readBytes(aBytes, nReadSize);
                const sal_Int8* p = aBytes.getArray();
                const sal_Int8* pEnd = p + nBytesRead;
                std::copy(p, pEnd, std::back_inserter(*itBuf));
                nSize -= nBytesRead;
            }
        }

        verifyStreams( aTestBuffers );
    }

    CPPUNIT_TEST_SUITE_REGISTRATION(PackageTest);
diff --git a/package/source/zipapi/XBufferedThreadedStream.cxx b/package/source/zipapi/XBufferedThreadedStream.cxx
new file mode 100644
index 0000000..59a89f9
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.cxx
@@ -0,0 +1,200 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This file is part of the LibreOffice project.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

#include <XBufferedThreadedStream.hxx>
#include <com/sun/star/packages/zip/ZipIOException.hpp>

using namespace css::uno;
using com::sun::star::packages::zip::ZipIOException;

namespace {

class UnzippingThread: public salhelper::Thread
{
    XBufferedThreadedStream &mxStream;
public:
    explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
private:
    virtual void execute() override
    {
        try
        {
            mxStream.produce();
        }
        catch( const RuntimeException &e )
        {
            SAL_WARN("package", "RuntimeException from unbuffered Stream " << e.Message );
            mxStream.saveException( new RuntimeException( e ) );
        }
        catch( const ZipIOException &e )
        {
            SAL_WARN("package", "ZipIOException from unbuffered Stream " << e.Message );
            mxStream.saveException( new ZipIOException( e ) );
        }
        catch( const Exception &e )
        {
            SAL_WARN("package", "Unexpected exception " << e.Message );
            mxStream.saveException( new Exception( e ) );
        }

        mxStream.setTerminateThread();
    }
};

}

XBufferedThreadedStream::XBufferedThreadedStream(
                    const Reference<XInputStream>& xSrcStream )
: mxSrcStream( xSrcStream )
, mnPos(0)
, mnStreamSize( xSrcStream->available() )
, mnOffset( 0 )
, mxUnzippingThread( new UnzippingThread(*this) )
, mbTerminateThread( false )
, maSavedException( nullptr )
{
    mxUnzippingThread->launch();
}

XBufferedThreadedStream::~XBufferedThreadedStream()
{
    setTerminateThread();
    mxUnzippingThread->join();
}

/**
 * Reads from UnbufferedStream in a seperate thread and stores the buffer blocks
 * in maPendingBuffers queue for further use.
 */
void XBufferedThreadedStream::produce()
{
    Buffer pProducedBuffer;
    std::unique_lock<std::mutex> aGuard( maBufferProtector );
    do
    {
        if( !maUsedBuffers.empty() )
        {
            pProducedBuffer = maUsedBuffers.front();
            maUsedBuffers.pop();
        }

        aGuard.unlock();
        mxSrcStream->readBytes( pProducedBuffer, nBufferSize );

        aGuard.lock();
        maPendingBuffers.push( pProducedBuffer );
        maBufferConsumeResume.notify_one();
        maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );

        if( mbTerminateThread )
            break;

    } while( hasBytes() );
}

/**
 * Fetches next available block from maPendingBuffers for use in Reading thread.
 */
const Buffer& XBufferedThreadedStream::getNextBlock()
{
    const sal_Int32 nBufSize = maInUseBuffer.getLength();
    if( nBufSize <= 0 || mnOffset >= nBufSize )
    {
        std::unique_lock<std::mutex> aGuard( maBufferProtector );
        if( mnOffset >= nBufSize )
            maUsedBuffers.push( maInUseBuffer );

        maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );

        if( maPendingBuffers.empty() )
        {
            maInUseBuffer = Buffer();
            if( maSavedException )
                throw *maSavedException;
        }
        else
        {
            maInUseBuffer = maPendingBuffers.front();
            maPendingBuffers.pop();
            mnOffset = 0;

            if( maPendingBuffers.size() <= nBufferLowWater )
                maBufferProduceResume.notify_one();
        }
    }

    return maInUseBuffer;
}

void XBufferedThreadedStream::setTerminateThread()
{
    mbTerminateThread = true;
    maBufferProduceResume.notify_one();
    maBufferConsumeResume.notify_one();
}

sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
{
    if( !hasBytes() )
        return 0;

    const sal_Int32 nAvailableSize = std::min<sal_Int32>( nBytesToRead, remainingSize() );
    rData.realloc( nAvailableSize );
    sal_Int32 i = 0, nPendingBytes = nAvailableSize;

    while( nPendingBytes )
    {
        const Buffer &pBuffer = getNextBlock();
        if( pBuffer.getLength() <= 0 )
        {
            rData.realloc( nAvailableSize - nPendingBytes );
            return nAvailableSize - nPendingBytes;
        }
        const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );

        memcpy( &rData[i], &pBuffer[mnOffset], limit );

        nPendingBytes -= limit;
        mnOffset += limit;
        mnPos += limit;
        i += limit;
    }

    return nAvailableSize;
}

sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
{
    return readBytes( aData, nMaxBytesToRead );
}
void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
{
    if( nBytesToSkip )
    {
        Sequence < sal_Int8 > aSequence( nBytesToSkip );
        readBytes( aSequence, nBytesToSkip );
    }
}

sal_Int32 SAL_CALL XBufferedThreadedStream::available()
{
    if( !hasBytes() )
        return 0;

    return remainingSize();
}

void SAL_CALL XBufferedThreadedStream::closeInput()
{
    setTerminateThread();
    mxUnzippingThread->join();
    mxSrcStream->closeInput();
}

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/XBufferedThreadedStream.hxx b/package/source/zipapi/XBufferedThreadedStream.hxx
new file mode 100644
index 0000000..b047b25
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.hxx
@@ -0,0 +1,79 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This file is part of the LibreOffice project.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

#ifndef INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
#define INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX

#include <salhelper/thread.hxx>
#include <XUnbufferedStream.hxx>
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>

typedef css::uno::Sequence< sal_Int8 > Buffer;

class XBufferedThreadedStream : public cppu::WeakImplHelper< css::io::XInputStream >
{
private:
    const css::uno::Reference<XInputStream> mxSrcStream;
    size_t mnPos;                                           /// position in stream
    size_t mnStreamSize;                                    /// available size of stream

    Buffer maInUseBuffer;                                   /// Buffer block in use
    int mnOffset;                                           /// position in maInUseBuffer
    std::queue < Buffer > maPendingBuffers;                 /// Buffers that are available for use
    std::queue < Buffer > maUsedBuffers;

    rtl::Reference< salhelper::Thread > mxUnzippingThread;
    std::mutex maBufferProtector;                           /// mutex protecting Buffer queues.
    std::condition_variable maBufferConsumeResume;
    std::condition_variable maBufferProduceResume;
    bool mbTerminateThread;                                 /// indicates the failure of one of the threads

    css::uno::Exception *maSavedException;                  /// exception caught during unzipping is saved to be thrown during reading

    static const size_t nBufferLowWater = 2;
    static const size_t nBufferHighWater = 4;
    static const size_t nBufferSize = 32 * 1024;

    const Buffer& getNextBlock();
    size_t remainingSize() const { return mnStreamSize - mnPos; }
    bool hasBytes() const { return mnPos < mnStreamSize; }

    bool canProduce() const
    {
        return( mbTerminateThread || maPendingBuffers.size() < nBufferHighWater );
    }

    bool canConsume() const
    {
        return( mbTerminateThread || !maPendingBuffers.empty() );
    }

public:
    XBufferedThreadedStream(
                  const css::uno::Reference<XInputStream>& xSrcStream );

    virtual ~XBufferedThreadedStream() override;

    void produce();
    void setTerminateThread();
    void saveException( css::uno::Exception *e ) { maSavedException = e; }

    // XInputStream
    virtual sal_Int32 SAL_CALL readBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override;
    virtual sal_Int32 SAL_CALL readSomeBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override;
    virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override;
    virtual sal_Int32 SAL_CALL available(  ) override;
    virtual void SAL_CALL closeInput(  ) override;
};
#endif

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipFile.cxx b/package/source/zipapi/ZipFile.cxx
index ba41d5f..ddea09b 100644
--- a/package/source/zipapi/ZipFile.cxx
+++ b/package/source/zipapi/ZipFile.cxx
@@ -44,6 +44,7 @@
#include <ZipFile.hxx>
#include <ZipEnumeration.hxx>
#include <XUnbufferedStream.hxx>
#include <XBufferedThreadedStream.hxx>
#include <PackageConstants.hxx>
#include <EncryptedDataHeader.hxx>
#include <EncryptionData.hxx>
@@ -625,7 +626,14 @@ uno::Reference< XInputStream > ZipFile::createStreamForZipEntry(
    if (!mbUseBufferedStream)
        return xSrcStream;

    uno::Reference<io::XInputStream> xBufStream(new XBufferedStream(xSrcStream));
    uno::Reference<io::XInputStream> xBufStream;
    static const sal_Int32 nThreadingThreshold = 10000;

    if( xSrcStream->available() > nThreadingThreshold )
        xBufStream = new XBufferedThreadedStream(xSrcStream);
    else
        xBufStream = new XBufferedStream(xSrcStream);

    return xBufStream;
}