View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   *
17   */
18  package org.codehaus.plexus.archiver.zip;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.SequenceInputStream;
24  import java.io.UncheckedIOException;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  import java.util.zip.Deflater;
29  import java.util.zip.ZipEntry;
30  
31  import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
32  import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
33  import org.apache.commons.compress.archivers.zip.StreamCompressor;
34  import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
35  import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
36  import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
37  import org.apache.commons.compress.parallel.InputStreamSupplier;
38  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
39  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
40  import org.apache.commons.io.IOUtils;
41  import org.codehaus.plexus.archiver.util.Streams;
42  
43  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
44  
45  public class ConcurrentJarCreator {
46  
47      private final boolean compressAddedZips;
48  
49      private final ScatterZipOutputStream metaInfDir;
50  
51      private final ScatterZipOutputStream manifest;
52  
53      private final ScatterZipOutputStream directories;
54  
55      private final ScatterZipOutputStream synchronousEntries;
56  
57      private final ParallelScatterZipCreator parallelScatterZipCreator;
58  
59      private final ExecutorService es;
60  
61      private long zipCloseElapsed;
62  
63      private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
64  
65          private int threshold;
66  
67          DeferredSupplier(int threshold) {
68              this.threshold = threshold;
69          }
70  
71          @Override
72          public ScatterGatherBackingStore get() throws IOException {
73              return new DeferredScatterOutputStream(threshold);
74          }
75      }
76  
77      public static ScatterZipOutputStream createDeferred(
78              ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
79          ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
80          StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
81          return new ScatterZipOutputStream(bs, sc);
82      }
83  
84      /**
85       * Creates a new {@code ConcurrentJarCreator} instance.
86       * <p>
87       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.</p>
88       * <p>
89       * This constructor has the same effect as
90       * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) }</p>
91       *
92       * @param nThreads The number of concurrent thread used to create the archive
93       * @throws IOException
94       */
95      public ConcurrentJarCreator(int nThreads) throws IOException {
96          this(true, nThreads);
97      }
98  
99      /**
100      * Creates a new {@code ConcurrentJarCreator} instance.
101      * <p>
102      * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.
103      * Entries that are already zip file could be just stored or compressed again.</p>
104      *
105      * @param compressAddedZips Indicates if entries that are zip files should be compressed.
106      *                          If set to {@code false} entries that are zip files will be added using
107      *                          {@link ZipEntry#STORED} method.
108      *                          If set to {@code true} entries that are zip files will be added using
109      *                          the compression method indicated by the {@code ZipArchiveEntry} passed
110      *                          to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}.
111      *                          The compression method for all entries that are not zip files will not be changed
112      *                          regardless of the value of this parameter
113      * @param nThreads The number of concurrent thread used to create the archive
114      *
115      * @throws IOException
116      */
117     public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
118         this.compressAddedZips = compressAddedZips;
119         ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(10000000 / nThreads);
120         metaInfDir = createDeferred(defaultSupplier);
121         manifest = createDeferred(defaultSupplier);
122         directories = createDeferred(defaultSupplier);
123         synchronousEntries = createDeferred(defaultSupplier);
124         es = Executors.newFixedThreadPool(nThreads);
125         parallelScatterZipCreator = new ParallelScatterZipCreator(es, defaultSupplier);
126     }
127 
128     /**
129      * Adds an archive entry to this archive.
130      * <p>
131      * This method is expected to be called from a single client thread</p>
132      *
133      * @param zipArchiveEntry The entry to add. Compression method
134      * @param source The source input stream supplier
135      * @param addInParallel Indicates if the entry should be add in parallel.
136      * If set to {@code false} the entry is added synchronously.
137      *
138      * @throws java.io.IOException
139      */
140     public void addArchiveEntry(
141             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
142             throws IOException {
143         final int method = zipArchiveEntry.getMethod();
144         if (method == -1) {
145             throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
146         }
147         final String zipEntryName = zipArchiveEntry.getName();
148         if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
149             // TODO This should be enforced because META-INF non-directory does not make any sense?!
150             if (zipArchiveEntry.isDirectory()) {
151                 zipArchiveEntry.setMethod(ZipEntry.STORED);
152             }
153             metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
154         } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
155             manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
156         } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
157             directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
158         } else if (addInParallel) {
159             parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
160         } else {
161             synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
162         }
163     }
164 
165     public void writeTo(ZipArchiveOutputStream targetStream)
166             throws IOException, ExecutionException, InterruptedException {
167         try {
168             metaInfDir.writeTo(targetStream);
169             manifest.writeTo(targetStream);
170             directories.writeTo(targetStream);
171             synchronousEntries.writeTo(targetStream);
172             parallelScatterZipCreator.writeTo(targetStream);
173         } finally {
174             es.shutdown();
175         }
176         long startAt = System.currentTimeMillis();
177         targetStream.close();
178         zipCloseElapsed = System.currentTimeMillis() - startAt;
179         metaInfDir.close();
180         manifest.close();
181         directories.close();
182         synchronousEntries.close();
183     }
184 
185     /**
186      * Returns a message describing the overall statistics of the compression run
187      *
188      * @return A string
189      */
190     public String getStatisticsMessage() {
191         return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
192     }
193 
194     private ZipArchiveEntryRequest createEntry(
195             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
196         // if we re-compress the zip files there is no need to look at the input stream
197         if (compressAddedZips) {
198             return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
199         }
200 
201         InputStream is = inputStreamSupplier.get();
202         // otherwise we should inspect the first four bytes to see if the input stream is zip file or not
203         byte[] header = new byte[4];
204         try {
205             int read = is.read(header);
206             int compressionMethod = zipArchiveEntry.getMethod();
207             if (isZipHeader(header)) {
208                 compressionMethod = ZipEntry.STORED;
209             }
210 
211             zipArchiveEntry.setMethod(compressionMethod);
212 
213             return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
214         } catch (IOException e) {
215             IOUtils.closeQuietly(is);
216             throw new UncheckedIOException(e);
217         }
218     }
219 
220     private boolean isZipHeader(byte[] header) {
221         return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
222     }
223 
224     private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
225         return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
226     }
227 }