1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 package org.codehaus.plexus.util.cli;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 import java.io.ByteArrayInputStream;
56 import java.io.IOException;
57 import java.io.InputStream;
58 import java.io.PrintWriter;
59 import java.io.StringWriter;
60 import java.util.ArrayList;
61 import java.util.List;
62
63 import org.junit.jupiter.api.Test;
64
65 import static org.junit.jupiter.api.Assertions.assertEquals;
66 import static org.junit.jupiter.api.Assertions.assertNotNull;
67 import static org.junit.jupiter.api.Assertions.assertTrue;
68
69
70
71
72
73
74
75
76 public class StreamPumperTest {
77 private String lineSeparator = System.lineSeparator();
78
79
80
81
82 @Test
83 public void testPumping() {
84 String line1 = "line1";
85 String line2 = "line2";
86 String lines = line1 + "\n" + line2;
87 ByteArrayInputStream inputStream = new ByteArrayInputStream(lines.getBytes());
88
89 TestConsumer consumer = new TestConsumer();
90 StreamPumper pumper = new StreamPumper(inputStream, consumer);
91 new Thread(pumper).run();
92
93
94 assertTrue(consumer.wasLineConsumed(line1, 1000));
95 assertTrue(consumer.wasLineConsumed(line2, 1000));
96 }
97
98
99
100
101 @org.junit.jupiter.api.Test
102 public void testPumpingWithPrintWriter() {
103 String inputString = "This a test string";
104 ByteArrayInputStream bais = new ByteArrayInputStream(inputString.getBytes());
105 StringWriter sw = new StringWriter();
106 PrintWriter pw = new PrintWriter(sw);
107 StreamPumper pumper = new StreamPumper(bais, pw);
108 pumper.run();
109 pumper.flush();
110 System.out.println("aaa" + sw.toString());
111 assertEquals("This a test string" + lineSeparator, sw.toString());
112 pumper.close();
113 }
114
115
116
117
118 @Test
119 public void testPumperReadsInputStreamUntilEndEvenIfConsumerFails() {
120
121 GeneratorInputStream gis = new GeneratorInputStream(1024 * 1024 * 4);
122 StreamPumper pumper = new StreamPumper(gis, new FailingConsumer());
123 pumper.run();
124 assertEquals(gis.size, gis.read, "input stream was not fully consumed, producer deadlocks");
125 assertTrue(gis.closed);
126 assertNotNull(pumper.getException());
127 }
128
129 static class GeneratorInputStream extends InputStream {
130
131 final int size;
132
133 int read = 0;
134
135 boolean closed = false;
136
137 public GeneratorInputStream(int size) {
138 this.size = size;
139 }
140
141 public int read() throws IOException {
142 if (read < size) {
143 read++;
144 return '\n';
145 } else {
146 return -1;
147 }
148 }
149
150 public void close() throws IOException {
151 closed = true;
152 }
153 }
154
155 static class FailingConsumer implements StreamConsumer {
156
157 public void consumeLine(String line) {
158 throw new NullPointerException("too bad, the consumer is badly implemented...");
159 }
160 }
161
162
163
164
165 static class TestConsumer implements StreamConsumer {
166
167 private List<String> lines = new ArrayList<String>();
168
169
170
171
172
173
174
175
176
177 public boolean wasLineConsumed(String testLine, long timeout) {
178
179 long start = System.currentTimeMillis();
180 long trialTime = 0;
181
182 do {
183 if (lines.contains(testLine)) {
184 return true;
185 }
186
187
188 try {
189 Thread.sleep(10);
190 } catch (InterruptedException e) {
191
192 }
193
194
195 trialTime = System.currentTimeMillis() - start;
196
197 } while (trialTime < timeout);
198
199
200 return false;
201 }
202
203 public void consumeLine(String line) {
204 lines.add(line);
205 }
206 }
207
208
209
210
211 @org.junit.jupiter.api.Test
212 public void testEnabled() {
213 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("AB\nCE\nEF".getBytes());
214 TestConsumer streamConsumer = new TestConsumer();
215 StreamPumper streamPumper = new StreamPumper(byteArrayInputStream, streamConsumer);
216 streamPumper.run();
217 assertEquals(3, streamConsumer.lines.size());
218 }
219
220
221
222
223 @Test
224 public void testDisabled() {
225 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("AB\nCE\nEF".getBytes());
226 TestConsumer streamConsumer = new TestConsumer();
227 StreamPumper streamPumper = new StreamPumper(byteArrayInputStream, streamConsumer);
228 streamPumper.disable();
229 streamPumper.run();
230 assertEquals(0, streamConsumer.lines.size());
231 }
232 }