1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.codehaus.plexus.archiver.zip;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.SequenceInputStream;
24 import java.io.UncheckedIOException;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executors;
27 import java.util.zip.Deflater;
28 import java.util.zip.ZipEntry;
29
30 import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
31 import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
32 import org.apache.commons.compress.archivers.zip.StreamCompressor;
33 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
34 import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
35 import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
36 import org.apache.commons.compress.parallel.InputStreamSupplier;
37 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
38 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
39 import org.apache.commons.io.IOUtils;
40 import org.codehaus.plexus.archiver.util.Streams;
41
42 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
43
44 public class ConcurrentJarCreator {
45
46 private final boolean compressAddedZips;
47
48 private final ScatterZipOutputStream metaInfDir;
49
50 private final ScatterZipOutputStream manifest;
51
52 private final ScatterZipOutputStream directories;
53
54 private final ScatterZipOutputStream synchronousEntries;
55
56 private final ParallelScatterZipCreator parallelScatterZipCreator;
57
58 private long zipCloseElapsed;
59
60 private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
61
62 private int threshold;
63
64 DeferredSupplier(int threshold) {
65 this.threshold = threshold;
66 }
67
68 @Override
69 public ScatterGatherBackingStore get() throws IOException {
70 return new DeferredScatterOutputStream(threshold);
71 }
72 }
73
74 public static ScatterZipOutputStream createDeferred(
75 ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
76 ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
77 StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
78 return new ScatterZipOutputStream(bs, sc);
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92 public ConcurrentJarCreator(int nThreads) throws IOException {
93 this(true, nThreads);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
115 this.compressAddedZips = compressAddedZips;
116 ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(10000000 / nThreads);
117 metaInfDir = createDeferred(defaultSupplier);
118 manifest = createDeferred(defaultSupplier);
119 directories = createDeferred(defaultSupplier);
120 synchronousEntries = createDeferred(defaultSupplier);
121 parallelScatterZipCreator =
122 new ParallelScatterZipCreator(Executors.newFixedThreadPool(nThreads), defaultSupplier);
123 }
124
125
126
127
128
129
130
131
132
133
134
135
136
137 public void addArchiveEntry(
138 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
139 throws IOException {
140 final int method = zipArchiveEntry.getMethod();
141 if (method == -1) {
142 throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
143 }
144 final String zipEntryName = zipArchiveEntry.getName();
145 if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
146
147 if (zipArchiveEntry.isDirectory()) {
148 zipArchiveEntry.setMethod(ZipEntry.STORED);
149 }
150 metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
151 } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
152 manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
153 } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
154 directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
155 } else if (addInParallel) {
156 parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
157 } else {
158 synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
159 }
160 }
161
162 public void writeTo(ZipArchiveOutputStream targetStream)
163 throws IOException, ExecutionException, InterruptedException {
164 metaInfDir.writeTo(targetStream);
165 manifest.writeTo(targetStream);
166 directories.writeTo(targetStream);
167 synchronousEntries.writeTo(targetStream);
168 parallelScatterZipCreator.writeTo(targetStream);
169 long startAt = System.currentTimeMillis();
170 targetStream.close();
171 zipCloseElapsed = System.currentTimeMillis() - startAt;
172 metaInfDir.close();
173 manifest.close();
174 directories.close();
175 synchronousEntries.close();
176 }
177
178
179
180
181
182
183 public String getStatisticsMessage() {
184 return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
185 }
186
187 private ZipArchiveEntryRequest createEntry(
188 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
189
190 if (compressAddedZips) {
191 return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
192 }
193
194 InputStream is = inputStreamSupplier.get();
195
196 byte[] header = new byte[4];
197 try {
198 int read = is.read(header);
199 int compressionMethod = zipArchiveEntry.getMethod();
200 if (isZipHeader(header)) {
201 compressionMethod = ZipEntry.STORED;
202 }
203
204 zipArchiveEntry.setMethod(compressionMethod);
205
206 return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
207 } catch (IOException e) {
208 IOUtils.closeQuietly(is);
209 throw new UncheckedIOException(e);
210 }
211 }
212
213 private boolean isZipHeader(byte[] header) {
214 return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
215 }
216
217 private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
218 return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
219 }
220 }