@@ -24,6 +24,8 @@ use std::io::{Cursor, Read, Write};
24
24
#[ cfg( test) ]
25
25
use proptest:: prelude:: * ;
26
26
27
+ use crate :: protocol:: vec_builder:: DEFAULT_BLOCK_SIZE ;
28
+
27
29
use super :: {
28
30
primitives:: { Int16 , Int32 , Int64 , Int8 , Varint , Varlong } ,
29
31
traits:: { ReadError , ReadType , WriteError , WriteType } ,
@@ -571,6 +573,7 @@ where
571
573
#[ cfg( feature = "compression-gzip" ) ]
572
574
RecordBatchCompression :: Gzip => {
573
575
use flate2:: read:: GzDecoder ;
576
+
574
577
let mut decoder = GzDecoder :: new ( reader) ;
575
578
let records = Self :: read_records ( & mut decoder, is_control, n_records) ?;
576
579
@@ -581,6 +584,7 @@ where
581
584
#[ cfg( feature = "compression-lz4" ) ]
582
585
RecordBatchCompression :: Lz4 => {
583
586
use lz4:: Decoder ;
587
+
584
588
let mut decoder = Decoder :: new ( reader) ?;
585
589
let records = Self :: read_records ( & mut decoder, is_control, n_records) ?;
586
590
@@ -592,6 +596,61 @@ where
592
596
593
597
records
594
598
}
599
+ #[ cfg( feature = "compression-snappy" ) ]
600
+ RecordBatchCompression :: Snappy => {
601
+ use snap:: raw:: { decompress_len, Decoder } ;
602
+
603
+ // Construct the input for the raw decoder.
604
+ let mut input = vec ! [ ] ;
605
+ reader. read_to_end ( & mut input) ?;
606
+
607
+ // The snappy compression used here is unframed aka "raw". So we first need to figure out the
608
+ // uncompressed length. See
609
+ //
610
+ // - https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L345-L348
611
+ // - https://github.com/edenhill/librdkafka/blob/747f77c98fbddf7dc6508f76398e0fc9ee91450f/src/snappy.c#L779
612
+ let uncompressed_size = decompress_len ( & input) . unwrap ( ) ;
613
+
614
+ // Decode snappy payload.
615
+ // The uncompressed length is unchecked and can be up to 2^32-1 bytes. To avoid a DDoS vector we try to
616
+ // limit it to a small size and if that fails we double that size;
617
+ let mut max_uncompressed_size = DEFAULT_BLOCK_SIZE ;
618
+ let output = loop {
619
+ let try_uncompressed_size = uncompressed_size. min ( max_uncompressed_size) ;
620
+
621
+ let mut decoder = Decoder :: new ( ) ;
622
+ let mut output = vec ! [ 0 ; try_uncompressed_size] ;
623
+ let actual_uncompressed_size = match decoder. decompress ( & input, & mut output) {
624
+ Ok ( size) => size,
625
+ Err ( snap:: Error :: BufferTooSmall { .. } )
626
+ if max_uncompressed_size < uncompressed_size =>
627
+ {
628
+ // try larger buffer
629
+ max_uncompressed_size *= 2 ;
630
+ continue ;
631
+ }
632
+ Err ( e) => {
633
+ return Err ( ReadError :: Malformed ( Box :: new ( e) ) ) ;
634
+ }
635
+ } ;
636
+ if actual_uncompressed_size != uncompressed_size {
637
+ return Err ( ReadError :: Malformed (
638
+ "broken snappy data" . to_string ( ) . into ( ) ,
639
+ ) ) ;
640
+ }
641
+
642
+ break output;
643
+ } ;
644
+
645
+ // Read uncompressed records.
646
+ let mut decoder = Cursor :: new ( output) ;
647
+ let records = Self :: read_records ( & mut decoder, is_control, n_records) ?;
648
+
649
+ // Check that there's no data left within the uncompressed block.
650
+ ensure_eof ( & mut decoder, "Data left in Snappy block" ) ?;
651
+
652
+ records
653
+ }
595
654
#[ allow( unreachable_patterns) ]
596
655
_ => {
597
656
return Err ( ReadError :: Malformed (
@@ -728,13 +787,15 @@ where
728
787
#[ cfg( feature = "compression-gzip" ) ]
729
788
RecordBatchCompression :: Gzip => {
730
789
use flate2:: { write:: GzEncoder , Compression } ;
790
+
731
791
let mut encoder = GzEncoder :: new ( writer, Compression :: default ( ) ) ;
732
792
Self :: write_records ( & mut encoder, self . records ) ?;
733
793
encoder. finish ( ) ?;
734
794
}
735
795
#[ cfg( feature = "compression-lz4" ) ]
736
796
RecordBatchCompression :: Lz4 => {
737
797
use lz4:: { liblz4:: BlockMode , EncoderBuilder } ;
798
+
738
799
let mut encoder = EncoderBuilder :: new ( )
739
800
. block_mode (
740
801
// the only one supported by Kafka
@@ -745,6 +806,21 @@ where
745
806
let ( _writer, res) = encoder. finish ( ) ;
746
807
res?;
747
808
}
809
+ #[ cfg( feature = "compression-snappy" ) ]
810
+ RecordBatchCompression :: Snappy => {
811
+ use snap:: raw:: { max_compress_len, Encoder } ;
812
+
813
+ let mut input = vec ! [ ] ;
814
+ Self :: write_records ( & mut input, self . records ) ?;
815
+
816
+ let mut encoder = Encoder :: new ( ) ;
817
+ let mut output = vec ! [ 0 ; max_compress_len( input. len( ) ) ] ;
818
+ let len = encoder
819
+ . compress ( & input, & mut output)
820
+ . map_err ( |e| WriteError :: Malformed ( Box :: new ( e) ) ) ?;
821
+
822
+ writer. write_all ( & output[ ..len] ) ?;
823
+ }
748
824
#[ allow( unreachable_patterns) ]
749
825
_ => {
750
826
return Err ( WriteError :: Malformed (
@@ -965,4 +1041,54 @@ mod tests {
965
1041
let actual2 = RecordBatch :: read ( & mut Cursor :: new ( data2) ) . unwrap ( ) ;
966
1042
assert_eq ! ( actual2, expected) ;
967
1043
}
1044
+
1045
+ #[ cfg( feature = "compression-snappy" ) ]
1046
+ #[ test]
1047
+ fn test_decode_fixture_snappy ( ) {
1048
+ // This data was obtained by watching rdkafka.
1049
+ let data = [
1050
+ b"\x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x58 \x00 \x00 \x00 \x00 " . to_vec ( ) ,
1051
+ b"\x02 \xad \x86 \xf4 \xf4 \x00 \x02 \x00 \x00 \x00 \x00 \x00 \x00 \x01 \x7e \xb6 " . to_vec ( ) ,
1052
+ b"\x45 \x0e \x52 \x00 \x00 \x01 \x7e \xb6 \x45 \x0e \x52 \xff \xff \xff \xff \xff " . to_vec ( ) ,
1053
+ b"\xff \xff \xff \xff \xff \xff \xff \xff \xff \x00 \x00 \x00 \x01 \x80 \x01 \x1c " . to_vec ( ) ,
1054
+ b"\xfc \x01 \x00 \x00 \x00 \xc8 \x01 \x78 \xfe \x01 \x00 \x8a \x01 \x00 \x50 \x16 " . to_vec ( ) ,
1055
+ b"\x68 \x65 \x6c \x6c \x6f \x20 \x6b \x61 \x66 \x6b \x61 \x02 \x06 \x66 \x6f \x6f " . to_vec ( ) ,
1056
+ b"\x06 \x62 \x61 \x72 " . to_vec ( ) ,
1057
+ ]
1058
+ . concat ( ) ;
1059
+
1060
+ let actual = RecordBatch :: read ( & mut Cursor :: new ( data) ) . unwrap ( ) ;
1061
+ let expected = RecordBatch {
1062
+ base_offset : 0 ,
1063
+ partition_leader_epoch : 0 ,
1064
+ last_offset_delta : 0 ,
1065
+ first_timestamp : 1643735486034 ,
1066
+ max_timestamp : 1643735486034 ,
1067
+ producer_id : -1 ,
1068
+ producer_epoch : -1 ,
1069
+ base_sequence : -1 ,
1070
+ records : ControlBatchOrRecords :: Records ( vec ! [ Record {
1071
+ timestamp_delta: 0 ,
1072
+ offset_delta: 0 ,
1073
+ key: vec![ b'x' ; 100 ] ,
1074
+ value: b"hello kafka" . to_vec( ) ,
1075
+ headers: vec![ RecordHeader {
1076
+ key: "foo" . to_owned( ) ,
1077
+ value: b"bar" . to_vec( ) ,
1078
+ } ] ,
1079
+ } ] ) ,
1080
+ compression : RecordBatchCompression :: Snappy ,
1081
+ is_transactional : false ,
1082
+ timestamp_type : RecordBatchTimestampType :: CreateTime ,
1083
+ } ;
1084
+ assert_eq ! ( actual, expected) ;
1085
+
1086
+ let mut data2 = vec ! [ ] ;
1087
+ actual. write ( & mut data2) . unwrap ( ) ;
1088
+
1089
+ // don't compare if the data is equal because compression encoder might work slightly differently, use another
1090
+ // roundtrip instead
1091
+ let actual2 = RecordBatch :: read ( & mut Cursor :: new ( data2) ) . unwrap ( ) ;
1092
+ assert_eq ! ( actual2, expected) ;
1093
+ }
968
1094
}
0 commit comments