Mockito 详解:
https://pdai.tech/md/develop/ut/dev-ut-x-mockito.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
if (partialTopic != null) {
KStream<String, GenericRecord>[] partialBranch = stream
.branch((k, v) -> {
Map<Utf8, Utf8> metrics = Optional.ofNullable((Map) v.get(METRIC)).orElse(new HashMap<>());
return metrics.containsKey(new Utf8(PARTIAL));
}
, (k, v) -> true);
partialBranch[0]
.process(() -> new Processor<String, GenericRecord, String, GenericRecord>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Record<String, GenericRecord> record) {
context.forward(record.withTimestamp(context.currentSystemTimeMs()));
}
})
.to(partialTopic);
stream = partialBranch[1];
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
@Override
@SuppressWarnings("unchecked")
public KStream<String, GenericRecord> apply(KStream<String, GenericRecord> stream) {
KStream<String, GenericRecord>[] postBranch = stream
.transform(() -> new MappingTransformer(configsBySchemaName))
.branch((k, v) -> isErrorRecord(v), (k, v) -> true);
if (mappingStageConfig.hasErrorTopic()) {
postBranch[0].mapValues(this::unmarkErrorRecord).to(mappingStageConfig.errorTopic);
}
return postBranch[1];
}
@Test
public void testProduceWithAvroSerializer() throws NoSuchFieldException, IllegalAccessException {
// Set up the test data
settings = this.settings();
SinkRecord sinkRecord = getTestRecord();
List<SinkRecord> sinkRecords = Collections.singletonList(sinkRecord);
AvroData avroData = mock(AvroData.class);
java.lang.reflect.Field avroDataField = KafkaSinkTask.class.getDeclaredField("avroData");
avroDataField.setAccessible(true);
avroDataField.set(task, avroData);
task.start(settings);
task.put(sinkRecords);
// Mock the necessary dependencies
when(config.getOutputTopic()).thenReturn("outputTopic");
when(avroProducer.send(any(ProducerRecord.class))).thenReturn("hello");
// Call the method under test
// verify(avroProducer, times(1)).send(any());
Assertions.assertEquals(avroProducer.send(any()), null);
}
### lambda 表達式:
https://zhuanlan.zhihu.com/p/112771403
//lambda表达式写法,省略接口和方法名
//Lambda 表达式的语法形式如下:(参数列表) -> { 方法体 }
// 匿名内部类可以是接口、抽象类或具体类
|