1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.codehaus.plexus.archiver.zip;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.SequenceInputStream;
24 import java.io.UncheckedIOException;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.zip.Deflater;
29 import java.util.zip.ZipEntry;
30
31 import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
32 import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
33 import org.apache.commons.compress.archivers.zip.StreamCompressor;
34 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
35 import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
36 import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
37 import org.apache.commons.compress.parallel.InputStreamSupplier;
38 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
39 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
40 import org.apache.commons.io.IOUtils;
41 import org.codehaus.plexus.archiver.util.Streams;
42
43 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
44
45 public class ConcurrentJarCreator {
46
47 private final boolean compressAddedZips;
48
49 private final ScatterZipOutputStream metaInfDir;
50
51 private final ScatterZipOutputStream manifest;
52
53 private final ScatterZipOutputStream directories;
54
55 private final ScatterZipOutputStream synchronousEntries;
56
57 private final ParallelScatterZipCreator parallelScatterZipCreator;
58
59 private final ExecutorService es;
60
61 private long zipCloseElapsed;
62
63 private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
64
65 private int threshold;
66
67 DeferredSupplier(int threshold) {
68 this.threshold = threshold;
69 }
70
71 @Override
72 public ScatterGatherBackingStore get() throws IOException {
73 return new DeferredScatterOutputStream(threshold);
74 }
75 }
76
77 public static ScatterZipOutputStream createDeferred(
78 ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
79 ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
80 StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
81 return new ScatterZipOutputStream(bs, sc);
82 }
83
84
85
86
87
88
89
90
91
92
93
94
95 public ConcurrentJarCreator(int nThreads) throws IOException {
96 this(true, nThreads);
97 }
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
118 this.compressAddedZips = compressAddedZips;
119 ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(10000000 / nThreads);
120 metaInfDir = createDeferred(defaultSupplier);
121 manifest = createDeferred(defaultSupplier);
122 directories = createDeferred(defaultSupplier);
123 synchronousEntries = createDeferred(defaultSupplier);
124 es = Executors.newFixedThreadPool(nThreads);
125 parallelScatterZipCreator = new ParallelScatterZipCreator(es, defaultSupplier);
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public void addArchiveEntry(
141 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
142 throws IOException {
143 final int method = zipArchiveEntry.getMethod();
144 if (method == -1) {
145 throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
146 }
147 final String zipEntryName = zipArchiveEntry.getName();
148 if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
149
150 if (zipArchiveEntry.isDirectory()) {
151 zipArchiveEntry.setMethod(ZipEntry.STORED);
152 }
153 metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
154 } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
155 manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
156 } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
157 directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
158 } else if (addInParallel) {
159 parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
160 } else {
161 synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
162 }
163 }
164
165 public void writeTo(ZipArchiveOutputStream targetStream)
166 throws IOException, ExecutionException, InterruptedException {
167 try {
168 metaInfDir.writeTo(targetStream);
169 manifest.writeTo(targetStream);
170 directories.writeTo(targetStream);
171 synchronousEntries.writeTo(targetStream);
172 parallelScatterZipCreator.writeTo(targetStream);
173 } finally {
174 es.shutdown();
175 }
176 long startAt = System.currentTimeMillis();
177 targetStream.close();
178 zipCloseElapsed = System.currentTimeMillis() - startAt;
179 metaInfDir.close();
180 manifest.close();
181 directories.close();
182 synchronousEntries.close();
183 }
184
185
186
187
188
189
190 public String getStatisticsMessage() {
191 return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
192 }
193
194 private ZipArchiveEntryRequest createEntry(
195 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
196
197 if (compressAddedZips) {
198 return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
199 }
200
201 InputStream is = inputStreamSupplier.get();
202
203 byte[] header = new byte[4];
204 try {
205 int read = is.read(header);
206 int compressionMethod = zipArchiveEntry.getMethod();
207 if (isZipHeader(header)) {
208 compressionMethod = ZipEntry.STORED;
209 }
210
211 zipArchiveEntry.setMethod(compressionMethod);
212
213 return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
214 } catch (IOException e) {
215 IOUtils.closeQuietly(is);
216 throw new UncheckedIOException(e);
217 }
218 }
219
220 private boolean isZipHeader(byte[] header) {
221 return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
222 }
223
224 private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
225 return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
226 }
227 }