1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.codehaus.plexus.archiver.zip;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.SequenceInputStream;
24 import java.io.UncheckedIOException;
25 import java.util.concurrent.ExecutionException;
26 import java.util.zip.Deflater;
27 import java.util.zip.ZipEntry;
28
29 import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
30 import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream;
31 import org.apache.commons.compress.archivers.zip.StreamCompressor;
32 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
33 import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest;
34 import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
35 import org.apache.commons.compress.parallel.InputStreamSupplier;
36 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
37 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
38 import org.apache.commons.io.IOUtils;
39 import org.codehaus.plexus.archiver.util.Streams;
40
41 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
42
43 public class ConcurrentJarCreator {
44
45 private final boolean compressAddedZips;
46
47 private final ScatterZipOutputStream metaInfDir;
48
49 private final ScatterZipOutputStream manifest;
50
51 private final ScatterZipOutputStream directories;
52
53 private final ScatterZipOutputStream synchronousEntries;
54
55 private final ParallelScatterZipCreator parallelScatterZipCreator;
56
57 private long zipCloseElapsed;
58
59 private static class DeferredSupplier implements ScatterGatherBackingStoreSupplier {
60
61 private int threshold;
62
63 DeferredSupplier(int threshold) {
64 this.threshold = threshold;
65 }
66
67 @Override
68 public ScatterGatherBackingStore get() throws IOException {
69 return new DeferredScatterOutputStream(threshold);
70 }
71 }
72
73 public static ScatterZipOutputStream createDeferred(
74 ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) throws IOException {
75 ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
76 StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs);
77 return new ScatterZipOutputStream(bs, sc);
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91 public ConcurrentJarCreator(int nThreads) throws IOException {
92 this(true, nThreads);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public ConcurrentJarCreator(boolean compressAddedZips, int nThreads) throws IOException {
114 this.compressAddedZips = compressAddedZips;
115 ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier(100000000 / nThreads);
116 metaInfDir = createDeferred(defaultSupplier);
117 manifest = createDeferred(defaultSupplier);
118 directories = createDeferred(defaultSupplier);
119 synchronousEntries = createDeferred(defaultSupplier);
120 parallelScatterZipCreator = new ParallelScatterZipCreator(
121 ConcurrentJarCreatorExecutorServiceFactory.createExecutorService(nThreads), defaultSupplier);
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135
136 public void addArchiveEntry(
137 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source, final boolean addInParallel)
138 throws IOException {
139 final int method = zipArchiveEntry.getMethod();
140 if (method == -1) {
141 throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry");
142 }
143 final String zipEntryName = zipArchiveEntry.getName();
144 if ("META-INF".equals(zipEntryName) || "META-INF/".equals(zipEntryName)) {
145
146 if (zipArchiveEntry.isDirectory()) {
147 zipArchiveEntry.setMethod(ZipEntry.STORED);
148 }
149 metaInfDir.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
150 } else if ("META-INF/MANIFEST.MF".equals(zipEntryName)) {
151 manifest.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, source));
152 } else if (zipArchiveEntry.isDirectory() && !zipArchiveEntry.isUnixSymlink()) {
153 directories.addArchiveEntry(createZipArchiveEntryRequest(zipArchiveEntry, () -> Streams.EMPTY_INPUTSTREAM));
154 } else if (addInParallel) {
155 parallelScatterZipCreator.addArchiveEntry(() -> createEntry(zipArchiveEntry, source));
156 } else {
157 synchronousEntries.addArchiveEntry(createEntry(zipArchiveEntry, source));
158 }
159 }
160
161 public void writeTo(ZipArchiveOutputStream targetStream)
162 throws IOException, ExecutionException, InterruptedException {
163 metaInfDir.writeTo(targetStream);
164 manifest.writeTo(targetStream);
165 directories.writeTo(targetStream);
166 synchronousEntries.writeTo(targetStream);
167 parallelScatterZipCreator.writeTo(targetStream);
168 long startAt = System.currentTimeMillis();
169 targetStream.close();
170 zipCloseElapsed = System.currentTimeMillis() - startAt;
171 metaInfDir.close();
172 manifest.close();
173 directories.close();
174 synchronousEntries.close();
175 }
176
177
178
179
180
181
182 public String getStatisticsMessage() {
183 return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms";
184 }
185
186 private ZipArchiveEntryRequest createEntry(
187 final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier inputStreamSupplier) {
188
189 if (compressAddedZips) {
190 return createZipArchiveEntryRequest(zipArchiveEntry, inputStreamSupplier);
191 }
192
193 InputStream is = inputStreamSupplier.get();
194
195 byte[] header = new byte[4];
196 try {
197 int read = is.read(header);
198 int compressionMethod = zipArchiveEntry.getMethod();
199 if (isZipHeader(header)) {
200 compressionMethod = ZipEntry.STORED;
201 }
202
203 zipArchiveEntry.setMethod(compressionMethod);
204
205 return createZipArchiveEntryRequest(zipArchiveEntry, prependBytesToStream(header, read, is));
206 } catch (IOException e) {
207 IOUtils.closeQuietly(is);
208 throw new UncheckedIOException(e);
209 }
210 }
211
212 private boolean isZipHeader(byte[] header) {
213 return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4;
214 }
215
216 private InputStreamSupplier prependBytesToStream(final byte[] bytes, final int len, final InputStream stream) {
217 return () -> len > 0 ? new SequenceInputStream(new ByteArrayInputStream(bytes, 0, len), stream) : stream;
218 }
219 }