KAFKA-20173: Propagate headers into serde 7/N#21851
KAFKA-20173: Propagate headers into serde 7/N#21851UladzislauBlok wants to merge 7 commits intoapache:trunkfrom
Conversation
| final Headers headers = new RecordHeaders().add("key", "value".getBytes()); | ||
| final SubscriptionResponseWrapper<String> data = new SubscriptionResponseWrapper<>(null, foreignValue, 1); | ||
|
|
||
| final SubscriptionResponseWrapperSerde<String> testSerde = new SubscriptionResponseWrapperSerde<>(mockSerde); |
There was a problem hiding this comment.
There seems to be duplicated code until creating testSerde in both the tests. Possible to move to common util method?
There was a problem hiding this comment.
Tbh I didn't get it. What exactly is duplicated?
There was a problem hiding this comment.
Ah, I meant from topic creation till creation of the serde object, looks like that snippet could be moved into a util method? So later if we add more tests, that method could be reused ?
There was a problem hiding this comment.
I still can't see it. Do you want to move variable declaration to separate method? I use them later for assertions
| testSerde.deserializer().deserialize(topic, headers, serializedData); | ||
|
|
||
| verify(mockDeserializer).deserialize(topic, headers, "foreignValue".getBytes()); | ||
| verify(mockDeserializer, never()).deserialize(topic, "foreignValue".getBytes()); |
There was a problem hiding this comment.
Can we also verify if actual headers content "key" and "value" are part of the headers after deserialization?
There was a problem hiding this comment.
Headers are not serialized. The idea of this ticket is to propagate headers to underlying serde
See TimestampValueHeadersSerializer and TimestampValueHeadersDeserializer
|
|
||
| @Override | ||
| public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) { | ||
| return serialize(topic, new RecordHeaders(), data); |
There was a problem hiding this comment.
Don't we need to make changes in data class (SubscriptionResponseWrapper) constructor with new field 'headers' ?
There was a problem hiding this comment.
I don't think so. The idea of this change is to propagate headers to serde code. I think headers are propagated withing KS through internal record same as key and value (but may be I'm missing smtg)
see: https://issues.apache.org/jira/browse/KAFKA-20173
There was a problem hiding this comment.
Right. I was thinking headers should propagate and it's already done via record. Pls ignore my comment and thanks for explaining.
| import static org.hamcrest.MatcherAssert.assertThat; | ||
| import static org.hamcrest.Matchers.is; | ||
| import static org.hamcrest.Matchers.notNullValue; |
There was a problem hiding this comment.
@mjsax Do we use org.hamcrest? I thought AssertJ was chosen for KS.... Well never asked that, but had an assumption :)
This PR ensures headers are propagated in
SubscriptionResponseWrapperSerde,LeftOrRightValueSerializer/DeserializerandTimestampedKeyAndJoinSideSerializer/DeserializerAdded unit tests to check headers propagation