View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   *
17   */
18  package org.codehaus.plexus.archiver.zip;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.SequenceInputStream;
24  import java.io.UncheckedIOException;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.Executors;
27  import java.util.zip.Deflater;
28  import java.util.zip.ZipEntry;
29  
30  import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
31  import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
32  import org.apache.commons.compress.archivers.zip.StreamCompressor;
33  import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
34  import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
35  import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
36  import org.apache.commons.compress.parallel.InputStreamSupplier;
37  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
38  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
39  import org.apache.commons.io.IOUtils;
40  import org.codehaus.plexus.archiver.util.Streams;
41  
42  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
43  
44  public class ConcurrentJarCreator {
45  
46      private final boolean compressAddedZips;
47  
48      private final ScatterZipOutputStream metaInfDir;
49  
50      private final ScatterZipOutputStream manifest;
51  
52      private final ScatterZipOutputStream directories;
53  
54      private final ScatterZipOutputStream synchronousEntries;
55  
56      private final ParallelScatterZipCreator parallelScatterZipCreator;
57  
58      private long zipCloseElapsed;
59  
60      private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
61  
62          private int threshold;
63  
64          DeferredSupplier(int threshold) {
65              this.threshold = threshold;
66          }
67  
68          @Override
69          public ScatterGatherBackingStore get() throws IOException {
70              return new DeferredScatterOutputStream(threshold);
71          }
72      }
73  
74      public static ScatterZipOutputStream createDeferred(
75              ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
76          ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
77          StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
78          return new ScatterZipOutputStream(bs, sc);
79      }
80  
81      /**
82       * Creates a new {@code ConcurrentJarCreator} instance.
83       * <p>
84       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.</p>
85       * <p>
86       * This constructor has the same effect as
87       * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) }</p>
88       *
89       * @param nThreads The number of concurrent thread used to create the archive
90       * @throws IOException
91       */
92      public ConcurrentJarCreator(int nThreads) throws IOException {
93          this(true, nThreads);
94      }
95  
96      /**
97       * Creates a new {@code ConcurrentJarCreator} instance.
98       * <p>
99       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.
100      * Entries that are already zip file could be just stored or compressed again.</p>
101      *
102      * @param compressAddedZips Indicates if entries that are zip files should be compressed.
103      *                          If set to {@code false} entries that are zip files will be added using
104      *                          {@link ZipEntry#STORED} method.
105      *                          If set to {@code true} entries that are zip files will be added using
106      *                          the compression method indicated by the {@code ZipArchiveEntry} passed
107      *                          to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}.
108      *                          The compression method for all entries that are not zip files will not be changed
109      *                          regardless of the value of this parameter
110      * @param nThreads The number of concurrent thread used to create the archive
111      *
112      * @throws IOException
113      */
114     public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
115         this.compressAddedZips = compressAddedZips;
116         ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(10000000 / nThreads);
117         metaInfDir = createDeferred(defaultSupplier);
118         manifest = createDeferred(defaultSupplier);
119         directories = createDeferred(defaultSupplier);
120         synchronousEntries = createDeferred(defaultSupplier);
121         parallelScatterZipCreator =
122                 new ParallelScatterZipCreator(Executors.newFixedThreadPool(nThreads), defaultSupplier);
123     }
124 
125     /**
126      * Adds an archive entry to this archive.
127      * <p>
128      * This method is expected to be called from a single client thread</p>
129      *
130      * @param zipArchiveEntry The entry to add. Compression method
131      * @param source The source input stream supplier
132      * @param addInParallel Indicates if the entry should be add in parallel.
133      * If set to {@code false} the entry is added synchronously.
134      *
135      * @throws java.io.IOException
136      */
137     public void addArchiveEntry(
138             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
139             throws IOException {
140         final int method = zipArchiveEntry.getMethod();
141         if (method == -1) {
142             throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
143         }
144         final String zipEntryName = zipArchiveEntry.getName();
145         if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
146             // TODO This should be enforced because META-INF non-directory does not make any sense?!
147             if (zipArchiveEntry.isDirectory()) {
148                 zipArchiveEntry.setMethod(ZipEntry.STORED);
149             }
150             metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
151         } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
152             manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
153         } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
154             directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
155         } else if (addInParallel) {
156             parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
157         } else {
158             synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
159         }
160     }
161 
162     public void writeTo(ZipArchiveOutputStream targetStream)
163             throws IOException, ExecutionException, InterruptedException {
164         metaInfDir.writeTo(targetStream);
165         manifest.writeTo(targetStream);
166         directories.writeTo(targetStream);
167         synchronousEntries.writeTo(targetStream);
168         parallelScatterZipCreator.writeTo(targetStream);
169         long startAt = System.currentTimeMillis();
170         targetStream.close();
171         zipCloseElapsed = System.currentTimeMillis() - startAt;
172         metaInfDir.close();
173         manifest.close();
174         directories.close();
175         synchronousEntries.close();
176     }
177 
178     /**
179      * Returns a message describing the overall statistics of the compression run
180      *
181      * @return A string
182      */
183     public String getStatisticsMessage() {
184         return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
185     }
186 
187     private ZipArchiveEntryRequest createEntry(
188             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
189         // if we re-compress the zip files there is no need to look at the input stream
190         if (compressAddedZips) {
191             return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
192         }
193 
194         InputStream is = inputStreamSupplier.get();
195         // otherwise we should inspect the first four bytes to see if the input stream is zip file or not
196         byte[] header = new byte[4];
197         try {
198             int read = is.read(header);
199             int compressionMethod = zipArchiveEntry.getMethod();
200             if (isZipHeader(header)) {
201                 compressionMethod = ZipEntry.STORED;
202             }
203 
204             zipArchiveEntry.setMethod(compressionMethod);
205 
206             return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
207         } catch (IOException e) {
208             IOUtils.closeQuietly(is);
209             throw new UncheckedIOException(e);
210         }
211     }
212 
213     private boolean isZipHeader(byte[] header) {
214         return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
215     }
216 
217     private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
218         return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
219     }
220 }