This text affords an strategy to writing integration assessments for Kafka-based functions that focuses on interplay specification, making assessments extra readable and simpler to keep up. The proposed technique not solely enhances testing effectivity but additionally contributes to a greater understanding of the combination processes throughout the software.
The article builds on three concepts introduced in related articles: writing assessments with a transparent separation of Prepare-Act-Assert levels, isolation in Kafka assessments, and utilizing instruments to reinforce check visibility. I like to recommend reviewing these earlier than delving into the fabric of this text.
Demonstration State of affairs
Let’s take a Telegram bot that forwards requests to the OpenAI API and returns the end result to the consumer for example. If the request to OpenAI violates the system’s safety guidelines, the shopper might be notified. Moreover, a message might be despatched to Kafka for the behavioral management system in order that the supervisor can contact the consumer, clarify that their request was too delicate even for our bot, and ask them to evaluation their preferences.
The interplay contracts with providers are described in a simplified method to emphasise the core logic. Beneath is a sequence diagram demonstrating the applying’s structure. I perceive that the design might elevate questions from a system structure perspective, however please strategy it with understanding — the principle aim right here is to reveal the strategy to writing assessments.
Message Seize
The principle testing software would be the message seize object — RecordCaptor. Its operation is kind of much like the outgoing request seize object — RequestCaptor, which might be examine within the article Ordering Chaos: Arranging HTTP Request Testing in Spring (linked earlier).
Message seize might be carried out via a normal Kafka client. The checklist of matters should be specified explicitly by way of a configuration parameter.
@KafkaListener(id = "recordCaptor", matters = "#{'${test.record-captor.topics}'.split(',')}", groupId = "test")
public void eventCaptorListener(ConsumerRecord
The RecordCaptor
object accumulates info from captured messages.
Utilizing this strategy requires adhering to isolation in Kafka assessments. Ready for offset commit affirmation earlier than verifying check outcomes needs to be accomplished utilizing the KafkaSupport#waitForPartitionOffsetCommit technique.
Take a look at Instance
Beneath is the check code for the described situation.
def "User Message Processing with OpenAI"() {
setup:
KafkaSupport.waitForPartitionAssignment(applicationContext) // 1
and: // 2
def openaiRequestCaptor = restExpectation.openai.completions(withBadRequest().contentType(APPLICATION_JSON)
.physique("""{
"error": {
"code": "content_policy_violation",
"message": "Your request was rejected as a result of our safety system."
}
}"""))
def telegramRequestCaptor = restExpectation.telegram.sendMessage(withSuccess('{}', APPLICATION_JSON))
when:
mockMvc.carry out(publish("/telegram/webhook") // 3
.contentType(APPLICATION_JSON_VALUE)
.content material("""{
"message": {
"from": {
"id": 10000000
},
"chat": {
"id": 20000000
},
"text": "Hello!"
}
}""".toString())
.settle for(APPLICATION_JSON_VALUE))
.andExpect(standing().isOk())
KafkaSupport.waitForPartitionOffsetCommit(applicationContext) // 4
then:
openaiRequestCaptor.instances == 1 // 5
JSONAssert.assertEquals("""{
"content": "Hello!"
}""", openaiRequestCaptor.bodyString, false)
and:
telegramRequestCaptor.instances == 1
JSONAssert.assertEquals("""{
"chatId": "20000000",
"text": "Your request was rejected as a result of our safety system."
}""", telegramRequestCaptor.bodyString, false)
when: // 6
def message = recordCaptor.getRecords("topicC", "20000000").final
then:
message != null
JSONAssert.assertEquals("""{
"webhookMessage": {
"message": {
"chat": {
"id": "20000000"
},
"text": "Hello!"
}
},
"error": {
"code": "content_policy_violation",
"message": "Your request was rejected as a result of our safety system."
}
}""", message.worth as String, false)
}
Key steps:
- Look forward to partition project earlier than beginning the check situation.
- Mock requests to OpenAI and Telegram.
- Execute the check situation.
- Look forward to offset affirmation.
- Confirm requests to OpenAI and Telegram.
- Verify the message in Kafka.
Utilizing JSONAssert.assertEquals
ensures consistency in knowledge illustration throughout Kafka messages, logs, and assessments. This simplifies testing by offering flexibility as compared and accuracy in error prognosis.
The article offers an instance with JSON message format; different codecs are usually not lined, however the described strategy doesn’t impose format restrictions.
How To Discover Your Message in RecordCaptor
Messages in RecordCaptor
are organized by subject title and key. Within the offered check, the important thing used is the Kafka message key. When sending, we explicitly specify it:
sendMessage("topicC", chatId, ...);
...
non-public void sendMessage(String subject, String key, Object payload) {
Message message = MessageBuilder
.withPayload(objectMapper.writeValueAsString(payload))
.setHeader(KafkaHeaders.TOPIC, subject)
.setHeader(KafkaHeaders.KEY, key)
To look by message key inside a subject:
when:
def message = recordCaptor.getRecords("topicC", "20000000").final
If this selection isn’t appropriate, you have to describe your individual indexes based mostly on message parameters for setting up the search. An instance might be seen within the assessments PolicyViolationTestsCustomIndex.groovy.
Connecting RecordCaptor
The code for connecting RecordCaptor
seems as follows:
@TestConfiguration(proxyBeanMethods = false)
public class RecordCaptorConfiguration {
@Bean
RecordCaptor recordCaptor() {
return new RecordCaptor();
}
@Bean
RecordCaptorConsumer recordCaptorConsumer(RecordCaptor recordCaptor) {
return new RecordCaptorConsumer(recordCaptor, new RecordSnapshotMapper());
}
}
OffsetSnapshotFrame
Expertise has proven that working with Kafka-based functions requires instruments to facilitate understanding the state of shoppers and message consumption standing. For this job, you may examine subject offsets and client teams within the offset affirmation ready operation and log discrepancies, as illustrated within the picture:
The code for OffsetComparisonFrame is out there for evaluation.
Conclusion
Testing messages in Kafka utilizing the proposed strategy not solely simplifies check writing but additionally makes it extra structured and comprehensible. Using instruments like RecordCaptor
, in addition to adhering to isolation rules and clear separation of check levels, ensures excessive accuracy and effectivity.
Thanks for studying the article, and good luck in your efforts to put in writing efficient and clear assessments!