1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.codehaus.plexus.archiver.zip;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.SequenceInputStream;
24 import java.io.UncheckedIOException;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executors;
27 import java.util.zip.Deflater;
28 import java.util.zip.ZipEntry;
29
30 import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
31 import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
32 import org.apache.commons.compress.archivers.zip.StreamCompressor;
33 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
34 import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
35 import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
36 import org.apache.commons.compress.parallel.InputStreamSupplier;
37 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
38 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
39 import org.apache.commons.compress.utils.IOUtils;
40 import org.codehaus.plexus.archiver.util.Streams;
41
42 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
43
44 public class ConcurrentJarCreator {
45
46 private final boolean compressAddedZips;
47
48 private final ScatterZipOutputStream metaInfDir;
49
50 private final ScatterZipOutputStream manifest;
51
52 private final ScatterZipOutputStream directories;
53
54 private final ScatterZipOutputStream synchronousEntries;
55
56 private final ParallelScatterZipCreator parallelScatterZipCreator;
57
58 private long zipCloseElapsed;
59
60 private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
61
62 private int threshold;
63
64 DeferredSupplier(int threshold) {
65 this.threshold = threshold;
66 }
67
68 @Override
69 public ScatterGatherBackingStore get() throws IOException {
70 return new DeferredScatterOutputStream(threshold);
71 }
72 }
73
74 public static ScatterZipOutputStream createDeferred(
75 ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
76 ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
77 StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
78 return new ScatterZipOutputStream(bs, sc);
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public ConcurrentJarCreator(int nThreads) throws IOException {
94 this(true, nThreads);
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
116 this.compressAddedZips = compressAddedZips;
117 ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(100000000 / nThreads);
118 metaInfDir = createDeferred(defaultSupplier);
119 manifest = createDeferred(defaultSupplier);
120 directories = createDeferred(defaultSupplier);
121 synchronousEntries = createDeferred(defaultSupplier);
122 parallelScatterZipCreator =
123 new ParallelScatterZipCreator(Executors.newFixedThreadPool(nThreads), defaultSupplier);
124 }
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public void addArchiveEntry(
139 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
140 throws IOException {
141 final int method = zipArchiveEntry.getMethod();
142 if (method == -1) {
143 throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
144 }
145 final String zipEntryName = zipArchiveEntry.getName();
146 if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
147
148 if (zipArchiveEntry.isDirectory()) {
149 zipArchiveEntry.setMethod(ZipEntry.STORED);
150 }
151 metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
152 } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
153 manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
154 } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
155 directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
156 } else if (addInParallel) {
157 parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
158 } else {
159 synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
160 }
161 }
162
163 public void writeTo(ZipArchiveOutputStream targetStream)
164 throws IOException, ExecutionException, InterruptedException {
165 metaInfDir.writeTo(targetStream);
166 manifest.writeTo(targetStream);
167 directories.writeTo(targetStream);
168 synchronousEntries.writeTo(targetStream);
169 parallelScatterZipCreator.writeTo(targetStream);
170 long startAt = System.currentTimeMillis();
171 targetStream.close();
172 zipCloseElapsed = System.currentTimeMillis() - startAt;
173 metaInfDir.close();
174 manifest.close();
175 directories.close();
176 synchronousEntries.close();
177 }
178
179
180
181
182
183
184 public String getStatisticsMessage() {
185 return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
186 }
187
188 private ZipArchiveEntryRequest createEntry(
189 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
190
191 if (compressAddedZips) {
192 return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
193 }
194
195 InputStream is = inputStreamSupplier.get();
196
197 byte[] header = new byte[4];
198 try {
199 int read = is.read(header);
200 int compressionMethod = zipArchiveEntry.getMethod();
201 if (isZipHeader(header)) {
202 compressionMethod = ZipEntry.STORED;
203 }
204
205 zipArchiveEntry.setMethod(compressionMethod);
206
207 return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
208 } catch (IOException e) {
209 IOUtils.closeQuietly(is);
210 throw new UncheckedIOException(e);
211 }
212 }
213
214 private boolean isZipHeader(byte[] header) {
215 return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
216 }
217
218 private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
219 return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
220 }
221 }