View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   *
17   */
18  package org.codehaus.plexus.archiver.zip;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.SequenceInputStream;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.Executors;
26  import java.util.zip.Deflater;
27  import java.util.zip.ZipEntry;
28  import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
29  import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
30  import org.apache.commons.compress.archivers.zip.StreamCompressor;
31  import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
32  import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
33  import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequestSupplier;
34  import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
35  import org.apache.commons.compress.parallel.InputStreamSupplier;
36  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
37  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
38  import org.codehaus.plexus.util.IOUtil;
39  
40  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
41  
42  public class ConcurrentJarCreator
43  {
44  
45      private final boolean compressAddedZips;
46  
47      private final ScatterZipOutputStream directories;
48  
49      private final ScatterZipOutputStream metaInfDir;
50  
51      private final ScatterZipOutputStream manifest;
52  
53      private final ScatterZipOutputStream synchronousEntries;
54  
55      private final ParallelScatterZipCreator parallelScatterZipCreator;
56  
57      private long zipCloseElapsed;
58  
59      private static class DeferredSupplier
60          implements ScatterGatherBackingStoreSupplier
61      {
62  
63          private int threshold;
64  
65          DeferredSupplier( int threshold )
66          {
67              this.threshold = threshold;
68          }
69  
70          public ScatterGatherBackingStore get()
71              throws IOException
72          {
73              return new DeferredScatterOutputStream( threshold );
74          }
75  
76      }
77  
78      public static ScatterZipOutputStream createDeferred(
79          ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier )
80          throws IOException
81      {
82          ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
83          StreamCompressor sc = StreamCompressor.create( Deflater.DEFAULT_COMPRESSION, bs );
84          return new ScatterZipOutputStream( bs, sc );
85      }
86  
87      /**
88       * Creates a new {@code ConcurrentJarCreator} instance.
89       * <p>
90       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.</p>
91       * <p>
92       * This constructor has the same effect as
93       * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) }</p>
94       *
95       * @param nThreads The number of concurrent thread used to create the archive
96       *
97       * @throws IOException
98       */
99      public ConcurrentJarCreator( int nThreads ) throws IOException
100     {
101         this( true, nThreads );
102     }
103 
104     /**
105      * Creates a new {@code ConcurrentJarCreator} instance.
106      * <p>
107      * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.
108      * Entries that are already zip file could be just stored or compressed again.</p>
109      *
110      * @param compressAddedZips Indicates if entries that are zip files should be compressed.
111      *                          If set to {@code false} entries that are zip files will be added using
112      *                          {@link ZipEntry#STORED} method.
113      *                          If set to {@code true} entries that are zip files will be added using
114      *                          the compression method indicated by the {@code ZipArchiveEntry} passed
115      *                          to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}.
116      *                          The compression method for all entries that are not zip files will not be changed
117      *                          regardless of the value of this parameter
118      * @param nThreads The number of concurrent thread used to create the archive
119      *
120      * @throws IOException
121      */
122     public ConcurrentJarCreator( boolean compressAddedZips, int nThreads ) throws IOException
123     {
124         this.compressAddedZips = compressAddedZips;
125         ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier( 100000000 / nThreads );
126         directories = createDeferred( defaultSupplier );
127         manifest = createDeferred( defaultSupplier );
128         metaInfDir = createDeferred( defaultSupplier );
129         synchronousEntries = createDeferred( defaultSupplier );
130         parallelScatterZipCreator = new ParallelScatterZipCreator( Executors.newFixedThreadPool( nThreads ),
131                                                                    defaultSupplier );
132 
133     }
134 
135     /**
136      * Adds an archive entry to this archive.
137      * <p>
138      * This method is expected to be called from a single client thread</p>
139      *
140      * @param zipArchiveEntry The entry to add. Compression method
141      * @param source The source input stream supplier
142      * @param addInParallel Indicates if the entry should be add in parallel.
143      * If set to {@code false} the entry is added synchronously.
144      *
145      * @throws java.io.IOException
146      */
147     public void addArchiveEntry( final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source,
148                                  final boolean addInParallel ) throws IOException
149     {
150         final int method = zipArchiveEntry.getMethod();
151         if ( method == -1 )
152         {
153             throw new IllegalArgumentException( "Method must be set on the supplied zipArchiveEntry" );
154         }
155         if ( zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink() )
156         {
157             final ByteArrayInputStream payload = new ByteArrayInputStream( new byte[]
158             {
159             } );
160 
161             directories.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry, createInputStreamSupplier(
162                                                                        payload ) ) );
163 
164             payload.close();
165         }
166         else if ( "META-INF".equals( zipArchiveEntry.getName() ) || "META-INF/".equals( zipArchiveEntry.getName() ) )
167         {
168             InputStream payload = source.get();
169             if ( zipArchiveEntry.isDirectory() )
170             {
171                 zipArchiveEntry.setMethod( ZipEntry.STORED );
172             }
173             metaInfDir.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry,
174                                                                       createInputStreamSupplier( payload ) ) );
175 
176             payload.close();
177         }
178         else if ( "META-INF/MANIFEST.MF".equals( zipArchiveEntry.getName() ) )
179         {
180             InputStream payload = source.get();
181             if ( zipArchiveEntry.isDirectory() )
182             {
183                 zipArchiveEntry.setMethod( ZipEntry.STORED );
184             }
185             manifest.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry,
186                                                                     createInputStreamSupplier( payload ) ) );
187 
188             payload.close();
189         }
190         else if ( addInParallel )
191         {
192             parallelScatterZipCreator.addArchiveEntry( createEntrySupplier( zipArchiveEntry, source ) );
193         }
194         else
195         {
196             synchronousEntries.addArchiveEntry( createEntry( zipArchiveEntry, source ) );
197         }
198     }
199 
200     private InputStreamSupplier createInputStreamSupplier( final InputStream payload )
201     {
202         return new InputStreamSupplier()
203         {
204 
205             @Override
206             public InputStream get()
207             {
208                 return payload;
209             }
210 
211         };
212     }
213 
214     public void writeTo( ZipArchiveOutputStream targetStream ) throws IOException, ExecutionException,
215                                                                       InterruptedException
216     {
217         metaInfDir.writeTo( targetStream );
218         manifest.writeTo( targetStream );
219         directories.writeTo( targetStream );
220         synchronousEntries.writeTo( targetStream );
221         parallelScatterZipCreator.writeTo( targetStream );
222         long startAt = System.currentTimeMillis();
223         targetStream.close();
224         zipCloseElapsed = System.currentTimeMillis() - startAt;
225         metaInfDir.close();
226         manifest.close();
227         directories.close();
228         synchronousEntries.close();
229     }
230 
231     /**
232      * Returns a message describing the overall statistics of the compression run
233      *
234      * @return A string
235      */
236     public String getStatisticsMessage()
237     {
238         return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
239     }
240 
241     private ZipArchiveEntryRequestSupplier createEntrySupplier( final ZipArchiveEntry zipArchiveEntry,
242                                                                 final InputStreamSupplier inputStreamSupplier )
243     {
244 
245         return new ZipArchiveEntryRequestSupplier()
246         {
247 
248             @Override
249             public ZipArchiveEntryRequest get()
250             {
251                 try
252                 {
253                     return createEntry( zipArchiveEntry, inputStreamSupplier );
254                 }
255                 catch ( IOException e )
256                 {
257                     throw new RuntimeException( e );
258                 }
259             }
260 
261         };
262     }
263 
264     private ZipArchiveEntryRequest createEntry( final ZipArchiveEntry zipArchiveEntry,
265                                                 final InputStreamSupplier inputStreamSupplier ) throws IOException
266     {
267         // if we re-compress the zip files there is no need to look at the input stream
268 
269         if ( compressAddedZips )
270         {
271             return createZipArchiveEntryRequest( zipArchiveEntry, inputStreamSupplier );
272         }
273 
274         // otherwise we should inspect the first four bites to see if the input stream is zip file or not
275 
276         InputStream is = inputStreamSupplier.get();
277         byte[] header = new byte[4];
278         try
279         {
280             int read = is.read( header );
281             int compressionMethod = zipArchiveEntry.getMethod();
282             if ( isZipHeader( header ) ) {
283                 compressionMethod = ZipEntry.STORED;
284             }
285 
286             zipArchiveEntry.setMethod( compressionMethod );
287 
288             return createZipArchiveEntryRequest( zipArchiveEntry, prependBytesToStream( header, read, is ) );
289         }
290         catch ( IOException e )
291         {
292             IOUtil.close( is );
293             throw e;
294         }
295     }
296 
297     private boolean isZipHeader( byte[] header )
298     {
299         return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
300     }
301 
302     private InputStreamSupplier prependBytesToStream( final byte[] bytes, final int len, final InputStream stream )
303     {
304         return new InputStreamSupplier() {
305 
306             @Override
307             public InputStream get()
308             {
309                 return len > 0
310                             ? new SequenceInputStream( new ByteArrayInputStream( bytes, 0, len ), stream )
311                             : stream;
312             }
313 
314         };
315 
316     }
317 
318 }