Skip to content

KAFKA-20173: Propagate headers into serde 7/N#21851

Open
UladzislauBlok wants to merge 7 commits intoapache:trunkfrom
UladzislauBlok:bloku/kafka-20173-8
Open

KAFKA-20173: Propagate headers into serde 7/N#21851
UladzislauBlok wants to merge 7 commits intoapache:trunkfrom
UladzislauBlok:bloku/kafka-20173-8

Conversation

@UladzislauBlok
Copy link
Contributor

@UladzislauBlok UladzislauBlok commented Mar 22, 2026

This PR ensures headers are propagated in SubscriptionResponseWrapperSerde, LeftOrRightValueSerializer/Deserializer and TimestampedKeyAndJoinSideSerializer/Deserializer
Added unit tests to check headers propagation

@github-actions github-actions bot added triage PRs from the community streams labels Mar 22, 2026
final Headers headers = new RecordHeaders().add("key", "value".getBytes());
final SubscriptionResponseWrapper<String> data = new SubscriptionResponseWrapper<>(null, foreignValue, 1);

final SubscriptionResponseWrapperSerde<String> testSerde = new SubscriptionResponseWrapperSerde<>(mockSerde);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be duplicated code until creating testSerde in both the tests. Possible to move to common util method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh I didn't get it. What exactly is duplicated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I meant from topic creation till creation of the serde object, looks like that snippet could be moved into a util method? So later if we add more tests, that method could be reused ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still can't see it. Do you want to move variable declaration to separate method? I use them later for assertions

testSerde.deserializer().deserialize(topic, headers, serializedData);

verify(mockDeserializer).deserialize(topic, headers, "foreignValue".getBytes());
verify(mockDeserializer, never()).deserialize(topic, "foreignValue".getBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also verify if actual headers content "key" and "value" are part of the headers after deserialization?

Copy link
Contributor Author

@UladzislauBlok UladzislauBlok Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Headers are not serialized. The idea of this ticket is to propagate headers to underlying serde
See TimestampValueHeadersSerializer and TimestampValueHeadersDeserializer


@Override
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
return serialize(topic, new RecordHeaders(), data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to make changes in data class (SubscriptionResponseWrapper) constructor with new field 'headers' ?

Copy link
Contributor Author

@UladzislauBlok UladzislauBlok Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The idea of this change is to propagate headers to serde code. I think headers are propagated withing KS through internal record same as key and value (but may be I'm missing smtg)
see: https://issues.apache.org/jira/browse/KAFKA-20173

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I was thinking headers should propagate and it's already done via record. Pls ignore my comment and thanks for explaining.

Comment on lines 27 to 29
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Do we use org.hamcrest? I thought AssertJ was chosen for KS.... Well never asked that, but had an assumption :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants