Coverage Report - org.codehaus.plexus.archiver.zip.ConcurrentJarCreator
 
Classes in this File Line Coverage Branch Coverage Complexity
ConcurrentJarCreator
80%
58/72
66%
20/30
2.438
ConcurrentJarCreator$1
100%
2/2
N/A
2.438
ConcurrentJarCreator$2
50%
2/4
N/A
2.438
ConcurrentJarCreator$3
100%
2/2
50%
1/2
2.438
ConcurrentJarCreator$DeferredSupplier
100%
4/4
N/A
2.438
 
 1  
 /*
 2  
  *  Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  *  contributor license agreements.  See the NOTICE file distributed with
 4  
  *  this work for additional information regarding copyright ownership.
 5  
  *  The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  *  (the "License"); you may not use this file except in compliance with
 7  
  *  the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  *  Unless required by applicable law or agreed to in writing, software
 12  
  *  distributed under the License is distributed on an "AS IS" BASIS,
 13  
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  *  See the License for the specific language governing permissions and
 15  
  *  limitations under the License.
 16  
  *
 17  
  */
 18  
 package org.codehaus.plexus.archiver.zip;
 19  
 
 20  
 import java.io.ByteArrayInputStream;
 21  
 import java.io.IOException;
 22  
 import java.io.InputStream;
 23  
 import java.io.SequenceInputStream;
 24  
 import java.util.concurrent.ExecutionException;
 25  
 import java.util.concurrent.Executors;
 26  
 import java.util.zip.Deflater;
 27  
 import java.util.zip.ZipEntry;
 28  
 import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
 29  
 import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
 30  
 import org.apache.commons.compress.archivers.zip.StreamCompressor;
 31  
 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
 32  
 import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
 33  
 import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequestSupplier;
 34  
 import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
 35  
 import org.apache.commons.compress.parallel.InputStreamSupplier;
 36  
 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
 37  
 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
 38  
 import org.codehaus.plexus.util.IOUtil;
 39  
 
 40  
 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
 41  
 
 42  46944
 public class ConcurrentJarCreator
 43  
 {
 44  
 
 45  
     private final boolean compressAddedZips;
 46  
 
 47  
     private final ScatterZipOutputStream directories;
 48  
 
 49  
     private final ScatterZipOutputStream metaInfDir;
 50  
 
 51  
     private final ScatterZipOutputStream manifest;
 52  
 
 53  
     private final ScatterZipOutputStream synchronousEntries;
 54  
 
 55  
     private final ParallelScatterZipCreator parallelScatterZipCreator;
 56  
 
 57  
     private long zipCloseElapsed;
 58  
 
 59  
     private static class DeferredSupplier
 60  
         implements ScatterGatherBackingStoreSupplier
 61  
     {
 62  
 
 63  
         private int threshold;
 64  
 
 65  
         DeferredSupplier( int threshold )
 66  52
         {
 67  52
             this.threshold = threshold;
 68  52
         }
 69  
 
 70  
         public ScatterGatherBackingStore get()
 71  
             throws IOException
 72  
         {
 73  343
             return new DeferredScatterOutputStream( threshold );
 74  
         }
 75  
 
 76  
     }
 77  
 
 78  
     public static ScatterZipOutputStream createDeferred(
 79  
         ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier )
 80  
         throws IOException
 81  
     {
 82  208
         ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
 83  208
         StreamCompressor sc = StreamCompressor.create( Deflater.DEFAULT_COMPRESSION, bs );
 84  208
         return new ScatterZipOutputStream( bs, sc );
 85  
     }
 86  
 
 87  
     /**
 88  
      * Creates a new {@code ConcurrentJarCreator} instance.
 89  
      * <p>
 90  
      * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.</p>
 91  
      * <p>
 92  
      * This constructor has the same effect as
 93  
      * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) }</p>
 94  
      *
 95  
      * @param nThreads The number of concurrent thread used to create the archive
 96  
      *
 97  
      * @throws IOException
 98  
      */
 99  
     public ConcurrentJarCreator( int nThreads ) throws IOException
 100  
     {
 101  0
         this( true, nThreads );
 102  0
     }
 103  
 
 104  
     /**
 105  
      * Creates a new {@code ConcurrentJarCreator} instance.
 106  
      * <p>
 107  
      * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.
 108  
      * Entries that are already zip file could be just stored or compressed again.</p>
 109  
      *
 110  
      * @param compressAddedZips Indicates if entries that are zip files should be compressed.
 111  
      *                          If set to {@code false} entries that are zip files will be added using
 112  
      *                          {@link ZipEntry#STORED} method.
 113  
      *                          If set to {@code true} entries that are zip files will be added using
 114  
      *                          the compression method indicated by the {@code ZipArchiveEntry} passed
 115  
      *                          to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}.
 116  
      *                          The compression method for all entries that are not zip files will not be changed
 117  
      *                          regardless of the value of this parameter
 118  
      * @param nThreads The number of concurrent thread used to create the archive
 119  
      *
 120  
      * @throws IOException
 121  
      */
 122  
     public ConcurrentJarCreator( boolean compressAddedZips, int nThreads ) throws IOException
 123  52
     {
 124  52
         this.compressAddedZips = compressAddedZips;
 125  52
         ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier( 100000000 / nThreads );
 126  52
         directories = createDeferred( defaultSupplier );
 127  52
         manifest = createDeferred( defaultSupplier );
 128  52
         metaInfDir = createDeferred( defaultSupplier );
 129  52
         synchronousEntries = createDeferred( defaultSupplier );
 130  52
         parallelScatterZipCreator = new ParallelScatterZipCreator( Executors.newFixedThreadPool( nThreads ),
 131  
                                                                    defaultSupplier );
 132  
 
 133  52
     }
 134  
 
 135  
     /**
 136  
      * Adds an archive entry to this archive.
 137  
      * <p>
 138  
      * This method is expected to be called from a single client thread</p>
 139  
      *
 140  
      * @param zipArchiveEntry The entry to add. Compression method
 141  
      * @param source The source input stream supplier
 142  
      * @param addInParallel Indicates if the entry should be add in parallel.
 143  
      * If set to {@code false} the entry is added synchronously.
 144  
      *
 145  
      * @throws java.io.IOException
 146  
      */
 147  
     public void addArchiveEntry( final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source,
 148  
                                  final boolean addInParallel ) throws IOException
 149  
     {
 150  48410
         final int method = zipArchiveEntry.getMethod();
 151  48410
         if ( method == -1 )
 152  
         {
 153  0
             throw new IllegalArgumentException( "Method must be set on the supplied zipArchiveEntry" );
 154  
         }
 155  48410
         if ( zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink() )
 156  
         {
 157  1009
             final ByteArrayInputStream payload = new ByteArrayInputStream( new byte[]
 158  
             {
 159  
             } );
 160  
 
 161  1009
             directories.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry, createInputStreamSupplier(
 162  
                                                                        payload ) ) );
 163  
 
 164  1009
             payload.close();
 165  1009
         }
 166  47401
         else if ( "META-INF".equals( zipArchiveEntry.getName() ) || "META-INF/".equals( zipArchiveEntry.getName() ) )
 167  
         {
 168  0
             InputStream payload = source.get();
 169  0
             if ( zipArchiveEntry.isDirectory() )
 170  
             {
 171  0
                 zipArchiveEntry.setMethod( ZipEntry.STORED );
 172  
             }
 173  0
             metaInfDir.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry,
 174  
                                                                       createInputStreamSupplier( payload ) ) );
 175  
 
 176  0
             payload.close();
 177  0
         }
 178  47401
         else if ( "META-INF/MANIFEST.MF".equals( zipArchiveEntry.getName() ) )
 179  
         {
 180  17
             InputStream payload = source.get();
 181  17
             if ( zipArchiveEntry.isDirectory() )
 182  
             {
 183  0
                 zipArchiveEntry.setMethod( ZipEntry.STORED );
 184  
             }
 185  17
             manifest.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry,
 186  
                                                                     createInputStreamSupplier( payload ) ) );
 187  
 
 188  17
             payload.close();
 189  17
         }
 190  47384
         else if ( addInParallel )
 191  
         {
 192  46944
             parallelScatterZipCreator.addArchiveEntry( createEntrySupplier( zipArchiveEntry, source ) );
 193  
         }
 194  
         else
 195  
         {
 196  440
             synchronousEntries.addArchiveEntry( createEntry( zipArchiveEntry, source ) );
 197  
         }
 198  48410
     }
 199  
 
 200  
     private InputStreamSupplier createInputStreamSupplier( final InputStream payload )
 201  
     {
 202  1026
         return new InputStreamSupplier()
 203  1026
         {
 204  
 
 205  
             @Override
 206  
             public InputStream get()
 207  
             {
 208  1026
                 return payload;
 209  
             }
 210  
 
 211  
         };
 212  
     }
 213  
 
 214  
     public void writeTo( ZipArchiveOutputStream targetStream ) throws IOException, ExecutionException,
 215  
                                                                       InterruptedException
 216  
     {
 217  52
         metaInfDir.writeTo( targetStream );
 218  52
         manifest.writeTo( targetStream );
 219  52
         directories.writeTo( targetStream );
 220  52
         synchronousEntries.writeTo( targetStream );
 221  52
         parallelScatterZipCreator.writeTo( targetStream );
 222  52
         long startAt = System.currentTimeMillis();
 223  52
         targetStream.close();
 224  52
         zipCloseElapsed = System.currentTimeMillis() - startAt;
 225  52
         metaInfDir.close();
 226  52
         manifest.close();
 227  52
         directories.close();
 228  52
         synchronousEntries.close();
 229  52
     }
 230  
 
 231  
     /**
 232  
      * Returns a message describing the overall statistics of the compression run
 233  
      *
 234  
      * @return A string
 235  
      */
 236  
     public String getStatisticsMessage()
 237  
     {
 238  0
         return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
 239  
     }
 240  
 
 241  
     private ZipArchiveEntryRequestSupplier createEntrySupplier( final ZipArchiveEntry zipArchiveEntry,
 242  
                                                                 final InputStreamSupplier inputStreamSupplier )
 243  
     {
 244  
 
 245  46944
         return new ZipArchiveEntryRequestSupplier()
 246  46944
         {
 247  
 
 248  
             @Override
 249  
             public ZipArchiveEntryRequest get()
 250  
             {
 251  
                 try
 252  
                 {
 253  46944
                     return createEntry( zipArchiveEntry, inputStreamSupplier );
 254  
                 }
 255  0
                 catch ( IOException e )
 256  
                 {
 257  0
                     throw new RuntimeException( e );
 258  
                 }
 259  
             }
 260  
 
 261  
         };
 262  
     }
 263  
 
 264  
     private ZipArchiveEntryRequest createEntry( final ZipArchiveEntry zipArchiveEntry,
 265  
                                                 final InputStreamSupplier inputStreamSupplier ) throws IOException
 266  
     {
 267  
         // if we re-compress the zip files there is no need to look at the input stream
 268  
 
 269  47384
         if ( compressAddedZips )
 270  
         {
 271  47379
             return createZipArchiveEntryRequest( zipArchiveEntry, inputStreamSupplier );
 272  
         }
 273  
 
 274  
         // otherwise we should inspect the first four bites to see if the input stream is zip file or not
 275  
 
 276  5
         InputStream is = inputStreamSupplier.get();
 277  5
         byte[] header = new byte[4];
 278  
         try
 279  
         {
 280  5
             int read = is.read( header );
 281  5
             int compressionMethod = zipArchiveEntry.getMethod();
 282  5
             if ( isZipHeader( header ) ) {
 283  4
                 compressionMethod = ZipEntry.STORED;
 284  
             }
 285  
 
 286  5
             zipArchiveEntry.setMethod( compressionMethod );
 287  
 
 288  5
             return createZipArchiveEntryRequest( zipArchiveEntry, prependBytesToStream( header, read, is ) );
 289  
         }
 290  0
         catch ( IOException e )
 291  
         {
 292  0
             IOUtil.close( is );
 293  0
             throw e;
 294  
         }
 295  
     }
 296  
 
 297  
     private boolean isZipHeader( byte[] header )
 298  
     {
 299  5
         return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
 300  
     }
 301  
 
 302  
     private InputStreamSupplier prependBytesToStream( final byte[] bytes, final int len, final InputStream stream )
 303  
     {
 304  5
         return new InputStreamSupplier() {
 305  
 
 306  
             @Override
 307  
             public InputStream get()
 308  
             {
 309  5
                 return len > 0
 310  
                             ? new SequenceInputStream( new ByteArrayInputStream( bytes, 0, len ), stream )
 311  
                             : stream;
 312  
             }
 313  
 
 314  
         };
 315  
 
 316  
     }
 317  
 
 318  
 }