View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   *
17   */
18  package org.codehaus.plexus.archiver.zip;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.SequenceInputStream;
24  import java.io.UncheckedIOException;
25  import java.util.concurrent.ExecutionException;
26  import java.util.zip.Deflater;
27  import java.util.zip.ZipEntry;
28  
29  import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
30  import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
31  import org.apache.commons.compress.archivers.zip.StreamCompressor;
32  import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
33  import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
34  import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
35  import org.apache.commons.compress.parallel.InputStreamSupplier;
36  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
37  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
38  import org.apache.commons.io.IOUtils;
39  import org.codehaus.plexus.archiver.util.Streams;
40  
41  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
42  
43  public class ConcurrentJarCreator {
44  
45      private final boolean compressAddedZips;
46  
47      private final ScatterZipOutputStream metaInfDir;
48  
49      private final ScatterZipOutputStream manifest;
50  
51      private final ScatterZipOutputStream directories;
52  
53      private final ScatterZipOutputStream synchronousEntries;
54  
55      private final ParallelScatterZipCreator parallelScatterZipCreator;
56  
57      private long zipCloseElapsed;
58  
59      private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
60  
61          private int threshold;
62  
63          DeferredSupplier(int threshold) {
64              this.threshold = threshold;
65          }
66  
67          @Override
68          public ScatterGatherBackingStore get() throws IOException {
69              return new DeferredScatterOutputStream(threshold);
70          }
71      }
72  
73      public static ScatterZipOutputStream createDeferred(
74              ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
75          ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
76          StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
77          return new ScatterZipOutputStream(bs, sc);
78      }
79  
80      /**
81       * Creates a new {@code ConcurrentJarCreator} instance.
82       * <p>
83       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.</p>
84       * <p>
85       * This constructor has the same effect as
86       * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) }</p>
87       *
88       * @param nThreads The number of concurrent thread used to create the archive
89       * @throws IOException
90       */
91      public ConcurrentJarCreator(int nThreads) throws IOException {
92          this(true, nThreads);
93      }
94  
95      /**
96       * Creates a new {@code ConcurrentJarCreator} instance.
97       * <p>
98       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.
99       * Entries that are already zip file could be just stored or compressed again.</p>
100      *
101      * @param compressAddedZips Indicates if entries that are zip files should be compressed.
102      *                          If set to {@code false} entries that are zip files will be added using
103      *                          {@link ZipEntry#STORED} method.
104      *                          If set to {@code true} entries that are zip files will be added using
105      *                          the compression method indicated by the {@code ZipArchiveEntry} passed
106      *                          to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}.
107      *                          The compression method for all entries that are not zip files will not be changed
108      *                          regardless of the value of this parameter
109      * @param nThreads The number of concurrent thread used to create the archive
110      *
111      * @throws IOException
112      */
113     public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
114         this.compressAddedZips = compressAddedZips;
115         ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(100000000 / nThreads);
116         metaInfDir = createDeferred(defaultSupplier);
117         manifest = createDeferred(defaultSupplier);
118         directories = createDeferred(defaultSupplier);
119         synchronousEntries = createDeferred(defaultSupplier);
120         parallelScatterZipCreator = new ParallelScatterZipCreator(
121                 ConcurrentJarCreatorExecutorServiceFactory.createExecutorService(nThreads), defaultSupplier);
122     }
123 
124     /**
125      * Adds an archive entry to this archive.
126      * <p>
127      * This method is expected to be called from a single client thread</p>
128      *
129      * @param zipArchiveEntry The entry to add. Compression method
130      * @param source The source input stream supplier
131      * @param addInParallel Indicates if the entry should be add in parallel.
132      * If set to {@code false} the entry is added synchronously.
133      *
134      * @throws java.io.IOException
135      */
136     public void addArchiveEntry(
137             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
138             throws IOException {
139         final int method = zipArchiveEntry.getMethod();
140         if (method == -1) {
141             throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
142         }
143         final String zipEntryName = zipArchiveEntry.getName();
144         if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
145             // TODO This should be enforced because META-INF non-directory does not make any sense?!
146             if (zipArchiveEntry.isDirectory()) {
147                 zipArchiveEntry.setMethod(ZipEntry.STORED);
148             }
149             metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
150         } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
151             manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
152         } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
153             directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
154         } else if (addInParallel) {
155             parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
156         } else {
157             synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
158         }
159     }
160 
161     public void writeTo(ZipArchiveOutputStream targetStream)
162             throws IOException, ExecutionException, InterruptedException {
163         metaInfDir.writeTo(targetStream);
164         manifest.writeTo(targetStream);
165         directories.writeTo(targetStream);
166         synchronousEntries.writeTo(targetStream);
167         parallelScatterZipCreator.writeTo(targetStream);
168         long startAt = System.currentTimeMillis();
169         targetStream.close();
170         zipCloseElapsed = System.currentTimeMillis() - startAt;
171         metaInfDir.close();
172         manifest.close();
173         directories.close();
174         synchronousEntries.close();
175     }
176 
177     /**
178      * Returns a message describing the overall statistics of the compression run
179      *
180      * @return A string
181      */
182     public String getStatisticsMessage() {
183         return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
184     }
185 
186     private ZipArchiveEntryRequest createEntry(
187             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
188         // if we re-compress the zip files there is no need to look at the input stream
189         if (compressAddedZips) {
190             return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
191         }
192 
193         InputStream is = inputStreamSupplier.get();
194         // otherwise we should inspect the first four bytes to see if the input stream is zip file or not
195         byte[] header = new byte[4];
196         try {
197             int read = is.read(header);
198             int compressionMethod = zipArchiveEntry.getMethod();
199             if (isZipHeader(header)) {
200                 compressionMethod = ZipEntry.STORED;
201             }
202 
203             zipArchiveEntry.setMethod(compressionMethod);
204 
205             return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
206         } catch (IOException e) {
207             IOUtils.closeQuietly(is);
208             throw new UncheckedIOException(e);
209         }
210     }
211 
212     private boolean isZipHeader(byte[] header) {
213         return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
214     }
215 
216     private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
217         return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
218     }
219 }