View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   *
17   */
18  package org.codehaus.plexus.archiver.zip;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.SequenceInputStream;
24  import java.io.UncheckedIOException;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.Executors;
27  import java.util.zip.Deflater;
28  import java.util.zip.ZipEntry;
29  
30  import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
31  import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
32  import org.apache.commons.compress.archivers.zip.StreamCompressor;
33  import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
34  import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
35  import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
36  import org.apache.commons.compress.parallel.InputStreamSupplier;
37  import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
38  import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
39  import org.apache.commons.compress.utils.IOUtils;
40  import org.codehaus.plexus.archiver.util.Streams;
41  
42  import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
43  
44  public class ConcurrentJarCreator {
45  
46      private final boolean compressAddedZips;
47  
48      private final ScatterZipOutputStream metaInfDir;
49  
50      private final ScatterZipOutputStream manifest;
51  
52      private final ScatterZipOutputStream directories;
53  
54      private final ScatterZipOutputStream synchronousEntries;
55  
56      private final ParallelScatterZipCreator parallelScatterZipCreator;
57  
58      private long zipCloseElapsed;
59  
60      private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
61  
62          private int threshold;
63  
64          DeferredSupplier(int threshold) {
65              this.threshold = threshold;
66          }
67  
68          @Override
69          public ScatterGatherBackingStore get() throws IOException {
70              return new DeferredScatterOutputStream(threshold);
71          }
72      }
73  
74      public static ScatterZipOutputStream createDeferred(
75              ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
76          ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
77          StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
78          return new ScatterZipOutputStream(bs, sc);
79      }
80  
81      /**
82       * Creates a new {@code ConcurrentJarCreator} instance.
83       * <p>
84       * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.</p>
85       * <p>
86       * This constructor has the same effect as
87       * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) }</p>
88       *
89       * @param nThreads The number of concurrent thread used to create the archive
90       *
91       * @throws IOException
92       */
93      public ConcurrentJarCreator(int nThreads) throws IOException {
94          this(true, nThreads);
95      }
96  
97      /**
98       * Creates a new {@code ConcurrentJarCreator} instance.
99       * <p>
100      * {@code ConcurrentJarCreator} creates zip files using several concurrent threads.
101      * Entries that are already zip file could be just stored or compressed again.</p>
102      *
103      * @param compressAddedZips Indicates if entries that are zip files should be compressed.
104      *                          If set to {@code false} entries that are zip files will be added using
105      *                          {@link ZipEntry#STORED} method.
106      *                          If set to {@code true} entries that are zip files will be added using
107      *                          the compression method indicated by the {@code ZipArchiveEntry} passed
108      *                          to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}.
109      *                          The compression method for all entries that are not zip files will not be changed
110      *                          regardless of the value of this parameter
111      * @param nThreads The number of concurrent thread used to create the archive
112      *
113      * @throws IOException
114      */
115     public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
116         this.compressAddedZips = compressAddedZips;
117         ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(100000000 / nThreads);
118         metaInfDir = createDeferred(defaultSupplier);
119         manifest = createDeferred(defaultSupplier);
120         directories = createDeferred(defaultSupplier);
121         synchronousEntries = createDeferred(defaultSupplier);
122         parallelScatterZipCreator =
123                 new ParallelScatterZipCreator(Executors.newFixedThreadPool(nThreads), defaultSupplier);
124     }
125 
126     /**
127      * Adds an archive entry to this archive.
128      * <p>
129      * This method is expected to be called from a single client thread</p>
130      *
131      * @param zipArchiveEntry The entry to add. Compression method
132      * @param source The source input stream supplier
133      * @param addInParallel Indicates if the entry should be add in parallel.
134      * If set to {@code false} the entry is added synchronously.
135      *
136      * @throws java.io.IOException
137      */
138     public void addArchiveEntry(
139             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
140             throws IOException {
141         final int method = zipArchiveEntry.getMethod();
142         if (method == -1) {
143             throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
144         }
145         final String zipEntryName = zipArchiveEntry.getName();
146         if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
147             // TODO This should be enforced because META-INF non-directory does not make any sense?!
148             if (zipArchiveEntry.isDirectory()) {
149                 zipArchiveEntry.setMethod(ZipEntry.STORED);
150             }
151             metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
152         } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
153             manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
154         } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
155             directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
156         } else if (addInParallel) {
157             parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
158         } else {
159             synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
160         }
161     }
162 
163     public void writeTo(ZipArchiveOutputStream targetStream)
164             throws IOException, ExecutionException, InterruptedException {
165         metaInfDir.writeTo(targetStream);
166         manifest.writeTo(targetStream);
167         directories.writeTo(targetStream);
168         synchronousEntries.writeTo(targetStream);
169         parallelScatterZipCreator.writeTo(targetStream);
170         long startAt = System.currentTimeMillis();
171         targetStream.close();
172         zipCloseElapsed = System.currentTimeMillis() - startAt;
173         metaInfDir.close();
174         manifest.close();
175         directories.close();
176         synchronousEntries.close();
177     }
178 
179     /**
180      * Returns a message describing the overall statistics of the compression run
181      *
182      * @return A string
183      */
184     public String getStatisticsMessage() {
185         return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
186     }
187 
188     private ZipArchiveEntryRequest createEntry(
189             final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
190         // if we re-compress the zip files there is no need to look at the input stream
191         if (compressAddedZips) {
192             return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
193         }
194 
195         InputStream is = inputStreamSupplier.get();
196         // otherwise we should inspect the first four bytes to see if the input stream is zip file or not
197         byte[] header = new byte[4];
198         try {
199             int read = is.read(header);
200             int compressionMethod = zipArchiveEntry.getMethod();
201             if (isZipHeader(header)) {
202                 compressionMethod = ZipEntry.STORED;
203             }
204 
205             zipArchiveEntry.setMethod(compressionMethod);
206 
207             return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
208         } catch (IOException e) {
209             IOUtils.closeQuietly(is);
210             throw new UncheckedIOException(e);
211         }
212     }
213 
214     private boolean isZipHeader(byte[] header) {
215         return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
216     }
217 
218     private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
219         return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
220     }
221 }