Skip to content

Commit af7c32f

Browse files
dldiehl77artembilan
authored andcommitted
GH-1094: Add Deflater and Inflator PostProcessors
Fixes #1094 * Add/Fix Author tags and change copyright year. * Added documentation.
1 parent c746497 commit af7c32f

File tree

6 files changed

+177
-2
lines changed

6 files changed

+177
-2
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.support.postprocessor;
18+
19+
import java.io.IOException;
20+
import java.io.OutputStream;
21+
import java.util.zip.DeflaterOutputStream;
22+
23+
/**
24+
* A post processor that uses a {@link DeflaterOutputStream} to compress the message body.
25+
* Sets {@link org.springframework.amqp.core.MessageProperties#SPRING_AUTO_DECOMPRESS} to
26+
* true by default.
27+
*
28+
* @author David Diehl
29+
* @since 2.2
30+
*/
31+
public class DeflaterPostProcessor extends AbstractDeflaterPostProcessor {
32+
33+
public DeflaterPostProcessor() {
34+
super();
35+
}
36+
37+
public DeflaterPostProcessor(boolean autoDecompress) {
38+
super(autoDecompress);
39+
}
40+
41+
@Override
42+
protected OutputStream getCompressorStream(OutputStream zipped) throws IOException {
43+
return new DeflaterPostProcessor.SettableLevelDeflaterOutputStream(zipped, getLevel());
44+
}
45+
46+
@Override
47+
protected String getEncoding() {
48+
return "deflate";
49+
}
50+
51+
private static final class SettableLevelDeflaterOutputStream extends DeflaterOutputStream {
52+
53+
SettableLevelDeflaterOutputStream(OutputStream out, int level) {
54+
super(out);
55+
this.def.setLevel(level);
56+
}
57+
58+
}
59+
}

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626

2727
/**
2828
* A {@link MessagePostProcessor} that delegates to one of its {@link MessagePostProcessor}s
29-
* depending on the content encoding. Supports {@code gzip, zip} by default.
29+
* depending on the content encoding. Supports {@code gzip, zip, deflate} by default.
3030
*
3131
* @author Gary Russell
32+
* @author David Diehl
3233
* @since 1.4.2
3334
*/
3435
public class DelegatingDecompressingPostProcessor implements MessagePostProcessor, Ordered {
@@ -40,6 +41,7 @@ public class DelegatingDecompressingPostProcessor implements MessagePostProcesso
4041
public DelegatingDecompressingPostProcessor() {
4142
this.decompressors.put("gzip", new GUnzipPostProcessor());
4243
this.decompressors.put("zip", new UnzipPostProcessor());
44+
this.decompressors.put("deflate", new InflaterPostProcessor());
4345
}
4446

4547
@Override
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.support.postprocessor;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.util.zip.InflaterInputStream;
22+
23+
/**
24+
* A post processor that uses a {@link InflaterInputStream} to decompress the
25+
* message body.
26+
*
27+
* @author David Diehl
28+
* @since 2.2
29+
*/
30+
public class InflaterPostProcessor extends AbstractDecompressingPostProcessor {
31+
public InflaterPostProcessor() {
32+
}
33+
34+
public InflaterPostProcessor(boolean alwaysDecompress) {
35+
super(alwaysDecompress);
36+
}
37+
38+
protected InputStream getDecompressorStream(InputStream zipped) throws IOException {
39+
return new InflaterInputStream(zipped);
40+
}
41+
42+
protected String getEncoding() {
43+
return "deflate";
44+
}
45+
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@
5757
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5858
import org.springframework.amqp.support.AmqpHeaders;
5959
import org.springframework.amqp.support.postprocessor.AbstractCompressingPostProcessor;
60+
import org.springframework.amqp.support.postprocessor.DeflaterPostProcessor;
6061
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
6162
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
6263
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
64+
import org.springframework.amqp.support.postprocessor.InflaterPostProcessor;
6365
import org.springframework.amqp.support.postprocessor.UnzipPostProcessor;
6466
import org.springframework.amqp.support.postprocessor.ZipPostProcessor;
6567
import org.springframework.amqp.utils.test.TestUtils;
@@ -72,6 +74,7 @@
7274
* @author Gary Russell
7375
* @author Artem Bilan
7476
* @author Mohammad Hewedy
77+
* @author David Diehl
7578
*
7679
* @since 1.4.1
7780
*
@@ -517,6 +520,68 @@ public void testSimpleBatchZippedWithEncoding() throws Exception {
517520
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
518521
}
519522

523+
@Test
524+
public void testSimpleBatchDeflater() throws Exception {
525+
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 30000);
526+
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
527+
template.setConnectionFactory(this.connectionFactory);
528+
DeflaterPostProcessor deflaterPostProcessor = new DeflaterPostProcessor();
529+
assertThat(getStreamLevel(deflaterPostProcessor)).isEqualTo(Deflater.BEST_SPEED);
530+
template.setBeforePublishPostProcessors(deflaterPostProcessor);
531+
MessageProperties props = new MessageProperties();
532+
Message message = new Message("foo".getBytes(), props);
533+
template.send("", ROUTE, message);
534+
message = new Message("bar".getBytes(), props);
535+
template.send("", ROUTE, message);
536+
message = receive(template);
537+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate");
538+
InflaterPostProcessor inflater = new InflaterPostProcessor();
539+
message = inflater.postProcessMessage(message);
540+
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
541+
}
542+
543+
@Test
544+
public void testSimpleBatchDeflaterBestCompression() throws Exception {
545+
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 30000);
546+
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
547+
template.setConnectionFactory(this.connectionFactory);
548+
DeflaterPostProcessor deflaterPostProcessor = new DeflaterPostProcessor();
549+
deflaterPostProcessor.setLevel(Deflater.BEST_COMPRESSION);
550+
assertThat(getStreamLevel(deflaterPostProcessor)).isEqualTo(Deflater.BEST_COMPRESSION);
551+
template.setBeforePublishPostProcessors(deflaterPostProcessor);
552+
MessageProperties props = new MessageProperties();
553+
Message message = new Message("foo".getBytes(), props);
554+
template.send("", ROUTE, message);
555+
message = new Message("bar".getBytes(), props);
556+
template.send("", ROUTE, message);
557+
message = receive(template);
558+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate");
559+
InflaterPostProcessor inflater = new InflaterPostProcessor();
560+
message = inflater.postProcessMessage(message);
561+
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
562+
}
563+
564+
@Test
565+
public void testSimpleBatchDeflaterWithEncoding() throws Exception {
566+
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 30000);
567+
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
568+
template.setConnectionFactory(this.connectionFactory);
569+
DeflaterPostProcessor deflaterPostProcessor = new DeflaterPostProcessor();
570+
assertThat(getStreamLevel(deflaterPostProcessor)).isEqualTo(Deflater.BEST_SPEED);
571+
template.setBeforePublishPostProcessors(deflaterPostProcessor);
572+
MessageProperties props = new MessageProperties();
573+
props.setContentEncoding("foo");
574+
Message message = new Message("foo".getBytes(), props);
575+
template.send("", ROUTE, message);
576+
message = new Message("bar".getBytes(), props);
577+
template.send("", ROUTE, message);
578+
message = receive(template);
579+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate:foo");
580+
InflaterPostProcessor inflater = new InflaterPostProcessor();
581+
message = inflater.postProcessMessage(message);
582+
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
583+
}
584+
520585
private Message receive(BatchingRabbitTemplate template) throws InterruptedException {
521586
Message message = template.receive(ROUTE);
522587
int n = 0;

src/reference/asciidoc/amqp.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3689,7 +3689,7 @@ When using batching (see <<template-batching>>), this is invoked after the batch
36893689
The second is invoked immediately after a message is received.
36903690

36913691
These extension points are used for such features as compression and, for this purpose, several `MessagePostProcessor` implementations are provided.
3692-
`GZipPostProcessor` and `ZipPostProcessor` compress messages before sending, and `GUnzipPostProcessor` and `UnzipPostProcessor` decompress received messages.
3692+
`GZipPostProcessor`, `ZipPostProcessor` and `DeflaterPostProcessor` compress messages before sending, and `GUnzipPostProcessor`, `UnzipPostProcessor` and `InflaterPostProcessor` decompress received messages.
36933693

36943694
NOTE: Starting with version 2.1.5, the `GZipPostProcessor` can be configured with the `copyProperties = true` option to make a copy of the original message properties.
36953695
By default, these properties are reused for performance reasons, and modified with compression content encoding and the optional `MessageProperties.SPRING_AUTO_DECOMPRESS` header.

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ See <<template-confirms>> for more information.
9090

9191
Also, the publisher confirm type is now specified with the `ConfirmType` enum instead of the two mutually exclusive setter methods.
9292

93+
===== New MessagePostProcessor Classes
94+
95+
Classes `DeflaterPostProcessor` and `InflaterPostProcessor` were added to support compression and decompression, respectively, when the message content-encoding is set to `deflate`.
96+
9397
===== Other Changes
9498

9599
The `Declarables` object (for declaring multiple queues, exchanges, bindings) now has a filtered getter for each type.

0 commit comments

Comments
 (0)