From a61a3ac89c60811ec14de67f2c4af809a44924d4 Mon Sep 17 00:00:00 2001 From: kaushal Date: Sat, 3 Jul 2021 18:08:35 +0530 Subject: [PATCH 1/6] BAEL-4905: Send large messages with Kafka --- .../kafka/KafkaApplicationLongMessage.java | 75 +++++++++++++++++++ .../spring/kafka/KafkaConsumerConfig.java | 7 ++ .../spring/kafka/KafkaProducerConfig.java | 2 + .../spring/kafka/KafkaTopicConfig.java | 12 +++ .../src/main/resources/application.properties | 1 + 5 files changed, 97 insertions(+) create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java new file mode 100644 index 000000000000..0af0a4b091bf --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java @@ -0,0 +1,75 @@ +package com.baeldung.spring.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@SpringBootApplication +public class KafkaApplicationLongMessage { + + public static void main(String[] args) throws Exception { + + ConfigurableApplicationContext context = SpringApplication.run(KafkaApplicationLongMessage.class, args); + + LongMessageProducer producer = context.getBean(LongMessageProducer.class); + + String fileData = readLongMessage(); + producer.sendMessage(fileData); + + //Deliberate delay to let listener consume produced message before main thread stops + Thread.sleep(5000); + context.close(); + } + + private static String readLongMessage() throws IOException { + String data = ""; + + //update complete location of large message here + data = new String(Files.readAllBytes(Paths.get("RandomTextFile.txt"))); + return data; + } + + @Bean + public LongMessageProducer longMessageProducer() { + return new LongMessageProducer(); + } + + @Bean + public LongMessageListener longMessageListener() { + return new LongMessageListener(); + } + + public static class LongMessageProducer { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Value(value = "${long.message.topic.name}") + private String topicName; + + public void sendMessage(String message) { + kafkaTemplate.send(topicName, message); + System.out.println("Long message Sent"); + } + + } + + public static class LongMessageListener { + + @KafkaListener(topics = "${long.message.topic.name}", groupId = "longMessage", containerFactory = "longMessageKafkaListenerContainerFactory") + public void listenGroupLongMessage(String message) { + System.out.println("Received Message in group 'longMessage'"); + } + + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index abaa431eec9e..9495fcf50864 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -27,6 +27,8 @@ public ConsumerFactory consumerFactory(String groupId) { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520"); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520"); return new DefaultKafkaConsumerFactory<>(props); } @@ -56,6 +58,11 @@ public ConcurrentKafkaListenerContainerFactory partitionsKafkaLi return kafkaListenerContainerFactory("partitions"); } + @Bean + public ConcurrentKafkaListenerContainerFactory longMessageKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("longMessage"); + } + @Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java index 0223bab0fe04..9dff81a09db0 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java @@ -25,6 +25,8 @@ public ProducerFactory producerFactory() { configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520"); + return new DefaultKafkaProducerFactory<>(configProps); } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java index 00e4147cd0c6..8a006a72bc3f 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java @@ -19,6 +19,9 @@ public class KafkaTopicConfig { @Value(value = "${message.topic.name}") private String topicName; + @Value(value = "${long.message.topic.name}") + private String longMsgTopicName; + @Value(value = "${partitioned.topic.name}") private String partitionedTopicName; @@ -54,4 +57,13 @@ public NewTopic topic3() { public NewTopic topic4() { return new NewTopic(greetingTopicName, 1, (short) 1); } + + @Bean + public NewTopic topic5() { + NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1); + Map configs = new HashMap<>(); + configs.put("max.message.bytes", "20971520"); + newTopic.configs(configs); + return newTopic; + } } diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties index e6a4668da36c..e1a983339b27 100644 --- a/spring-kafka/src/main/resources/application.properties +++ b/spring-kafka/src/main/resources/application.properties @@ -1,5 +1,6 @@ kafka.bootstrapAddress=localhost:9092 message.topic.name=baeldung +long.message.topic.name=longMessage greeting.topic.name=greeting filtered.topic.name=filtered partitioned.topic.name=partitioned From fccbe7e4acf6a74b6108a5643574e3e774d4b77b Mon Sep 17 00:00:00 2001 From: kaushal Date: Sat, 25 Dec 2021 22:05:05 +0530 Subject: [PATCH 2/6] Bael-5212 Guide to Spectator --- metrics/pom.xml | 2 +- .../spectator/SpectatorMetersTest.java | 221 ++++++++++++++++++ .../java/com/baeldung/mono/MonoUnitTest.java | 57 ++++- .../src/test/java/encryption/Encrypt.java | 208 +++++++++++++++++ 4 files changed, 486 insertions(+), 2 deletions(-) create mode 100644 metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java create mode 100644 reactor-core/src/test/java/encryption/Encrypt.java diff --git a/metrics/pom.xml b/metrics/pom.xml index 6ac1761ca056..37b10ef4845c 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -78,7 +78,7 @@ com.netflix.spectator spectator-api - 0.132.0 + 1.0.11 diff --git a/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java b/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java new file mode 100644 index 000000000000..a73d6db608fd --- /dev/null +++ b/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java @@ -0,0 +1,221 @@ +package com.baeldung.metrics.spectator; + +import com.netflix.spectator.api.*; +import com.netflix.spectator.api.patterns.LongTaskTimer; +import com.netflix.spectator.api.patterns.PolledMeter; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SpectatorMetersTest { + @Test + public void spectatorCounterTest(){ + + class MyListProcessor { + private final Counter insertCounter; + private final Counter removeCounter; + List requestList = new ArrayList(); + + private MyListProcessor(Registry registry){ + insertCounter = registry.counter("list.insert.count"); + removeCounter = registry.counter("list.remove.count"); + } + + private void addToList(String element){ + requestList.add(element); + insertCounter.increment(); + } + + private void removeFromList(){ + requestList.remove(0); + removeCounter.increment(); + } + } + + MyListProcessor myListProcessor = new MyListProcessor(new DefaultRegistry()); + myListProcessor.addToList("element1"); + myListProcessor.addToList("element2"); + myListProcessor.addToList("element3"); + myListProcessor.removeFromList(); + + assertEquals(3, myListProcessor.insertCounter.count()); + assertEquals(1, myListProcessor.removeCounter.count()); + + } + + @Test + public void spectatorTimerTest() throws Exception { + + class MyRequestProcessor { + private final Timer requestLatency; + + private MyRequestProcessor(Registry registry) { + requestLatency = registry.timer("app.request.latency"); + } + + private String processRequest(int input) throws Exception { + return requestLatency.record(() -> handleRequest(input)); + + } + + private String handleRequest(int input) throws InterruptedException { + try { + Thread.sleep(input); + return "Done"; + } catch (InterruptedException e) { + e.printStackTrace(); + throw e; + } + } + } + + MyRequestProcessor myRequestProcessor = new MyRequestProcessor(new DefaultRegistry()); + myRequestProcessor.processRequest(3000); + assertEquals(1, myRequestProcessor.requestLatency.count()); + assertThat(myRequestProcessor.requestLatency.totalTime()).isBetween(3000000000L, 4000000000L); + } + + @Test + public void spectatorLongTaskTimerTest() throws Exception { + + class MyRequestProcessor { + private final LongTaskTimer refreshDuration; + private long duration; + + private MyRequestProcessor(Registry registry) { + refreshDuration = LongTaskTimer.get(registry, registry.createId("metadata.refreshDuration")); + } + + private String processRequest(int input) throws Exception { + final long taskId = refreshDuration.start(); + try { + Thread.sleep(input); + return "Done"; + + } catch (InterruptedException e) { + e.printStackTrace(); + throw e; + } finally { + refreshDuration.stop(taskId); + } + + } + } + + MyRequestProcessor myRequestProcessor = new MyRequestProcessor(new DefaultRegistry()); + myRequestProcessor.processRequest(3000); + System.out.println(myRequestProcessor.refreshDuration.measure()); + System.out.println(myRequestProcessor.duration); + System.out.println(myRequestProcessor.refreshDuration.duration()); + } + + @Test + public void spectatorGauges_polledMeter_Test(){ + class MyList { + + private List list = new ArrayList();; + private AtomicInteger listSize = new AtomicInteger(0); + + private MyList(Registry registry) { + PolledMeter.using(registry) + .withName("list.size") + .monitorValue(listSize); + } + + private void addToList(String element){ + list.add(element); + listSize.incrementAndGet(); + } + private void removeFromList(){ + list.remove(0); + listSize.decrementAndGet(); + } + private int size(){ + return list.size(); + } + } + + MyList myList = new MyList(new DefaultRegistry()); + myList.addToList("element1"); + myList.addToList("element2"); + myList.addToList("element3"); + myList.addToList("element4"); + myList.removeFromList(); + assertEquals(myList.size(), myList.listSize.get()); + } + + @Test + public void spectatorGauges_ActiveGauges_Test(){ + class MyList { + + private List list = new ArrayList();; + private AtomicInteger listSize = new AtomicInteger(0); + private Gauge gauge; + + private MyList(Registry registry) { + gauge = registry.gauge("list.size"); + } + + private void addToList(String element){ + list.add(element); + listSize.incrementAndGet(); + gauge.set(listSize.get()); + } + private void removeFromList(){ + list.remove(0); + listSize.decrementAndGet(); + gauge.set(listSize.get()); + } + private int size(){ + return list.size(); + } + } + + MyList myList = new MyList(new DefaultRegistry()); + myList.addToList("element1"); + myList.addToList("element2"); + myList.addToList("element3"); + myList.addToList("element4"); + myList.removeFromList(); + assertEquals(3.0, myList.gauge.value()); + } + + @Test + public void spectatorDistributionSummaryTest() throws Exception { + + class MyRequestProcessor { + private final DistributionSummary distributionSummary; + + private MyRequestProcessor(Registry registry) { + distributionSummary = registry.distributionSummary("app.request.size"); + } + + private void processRequest(String input) throws Exception { + distributionSummary.record((long) input.length()); + handleRequest(); + } + + private void handleRequest() throws InterruptedException { + try { + Thread.sleep(3000); + return; + } catch (InterruptedException e) { + e.printStackTrace(); + throw e; + } + } + } + + MyRequestProcessor myRequestProcessor = new MyRequestProcessor(new DefaultRegistry()); + myRequestProcessor.processRequest("This is my sample input."); + assertEquals(1, myRequestProcessor.distributionSummary.count()); + assertEquals("This is my sample input.".length(), (int) myRequestProcessor.distributionSummary.totalAmount()); + } + +} + diff --git a/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java b/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java index fc42620e8e76..1a126466f886 100644 --- a/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java +++ b/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java @@ -4,6 +4,8 @@ import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.time.Duration; @@ -11,6 +13,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; @@ -69,7 +73,7 @@ private Flux monoTofluxUsingFlatMapIterable(Mono> monoList) { private Flux monoTofluxUsingFlatMapMany(Mono> monoList) { return monoList - .flatMapMany(Flux::fromIterable) + .flatMapMany(it -> Flux.fromIterable(it)) .log(); } @@ -159,6 +163,57 @@ private Mono sampleMsg(String str) { log.debug("Call to Retrieve Sample Message!! --> {} at: {}", str, System.currentTimeMillis()); return Mono.just(str); } + + @Test + public void testFluxDelay(){ + + Flux test = Flux.range(1,10) + .publishOn(Schedulers.parallel()) + .log() + .doOnNext(n -> log.debug("first... {}",n)) + .limitRate(4) + .doOnNext(n -> log.debug("second... {}",n)) +// .map(x -> { +// try { +// Thread.sleep(2000L); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// return x; +// }); +// .flatMap(i -> ) +// .delayElements(Duration.ofMillis(3000L), Schedulers.parallel()) + .doOnNext(n -> log.debug("third... {}",n)); +// .delaySubscription(Duration.ofMillis(5000L)); +// .delaySubscription(Flux.just(1,2,3).doOnNext(n -> log.debug("second {}",n)).log()); + + test.toStream().forEach(n -> log.debug("stream....{}",n)); + +// StepVerifier.create(test).expectNext(1,2,3,4,5,6,7,8,9,10).verifyComplete(); + + } + + @Test + public void delayedFlux(){ +// Flux delayedFlux = Flux.interval(Duration.ofSeconds(1)) + Flux delayedFlux = Flux.range(1,100) + .log() + .limitRate(4) + .log() + .map(n -> { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return n; + }) +// .delayElements(Duration.ofMillis(1000L)) + .doOnNext(n -> log.debug("First...{}",n)); + + + delayedFlux.toStream().forEach(x -> log.debug("final...{}",x)); + } } diff --git a/reactor-core/src/test/java/encryption/Encrypt.java b/reactor-core/src/test/java/encryption/Encrypt.java new file mode 100644 index 000000000000..b55173cd2bc3 --- /dev/null +++ b/reactor-core/src/test/java/encryption/Encrypt.java @@ -0,0 +1,208 @@ +package encryption; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; + +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Base64; + +@Slf4j +public class Encrypt { + + @Test + public void generateHttpSignature() throws InvalidKeyException, NoSuchAlgorithmException { + String httpSignature = String.join(", ", + getSignatureTemplate("keyid", getMerchantKey()), + getSignatureTemplate("algorithm", "HmacSHA256"), + getSignatureTemplate("headers", "host date (request-target) digest v-c-merchant-id"), + getSignatureTemplate("signature", generateSignatureHash())); + + log.debug(httpSignature); + } + + private String getSignatureTemplate(String keyString, String valueString){ + return keyString+ "=\"" + valueString + "\""; + } + + private String getMerchantKey(){ + return "22f596a4-23ae-4031-bcbc-6f25dbfca91b"; + } + + private String generateSignatureHash() throws NoSuchAlgorithmException, InvalidKeyException { + String signatureParam = getSignatureParams(); + String keyString = getSharedSecret(); + byte[] decodedKey = Base64.getDecoder().decode(keyString); + SecretKey originalKey = new SecretKeySpec(decodedKey, 0, decodedKey.length, "HmacSHA256"); + Mac hmacSha256 = Mac.getInstance("HmacSHA256"); + hmacSha256.init(originalKey); + hmacSha256.update(signatureParam.getBytes()); + byte[] HmachSha256DigestBytes = hmacSha256.doFinal(); + return Base64.getEncoder().encodeToString(HmachSha256DigestBytes); + } + + private String getSharedSecret(){ + return "dfp630WMn73xEmd5KRa8/AZcbyLkoE3NHn3/SFAtmc8="; + } + + public String generateDigest() throws NoSuchAlgorithmException { + String payload = getPayload(); + MessageDigest md = MessageDigest.getInstance("SHA-256"); + md.update(payload.getBytes(StandardCharsets.UTF_8)); + byte[] digest = md.digest(); + String computedDigest = "SHA-256=" + Base64.getEncoder().encodeToString(digest); + log.debug("Computed Digest: {}", computedDigest); + return computedDigest; + } + + private String getSignatureParams() throws NoSuchAlgorithmException { + String customSignatureParam = String.join("\n", + getHost(), + getDate(), + getRequestTarget(), + getGeneratedDigest(), + getMerchantId()); + + return customSignatureParam; + } + + private String getHost(){ + return "host: apitest.cybersource.com"; + } + + private String getDate(){ + String date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)); + log.debug(date); + return "date: "+date; + } + private String getRequestTarget(){ + return "(request-target): post /flex/v2/sessions"; + } + + private String getGeneratedDigest() throws NoSuchAlgorithmException { + return "digest: "+ generateDigest(); + } + private String getMerchantId(){ + return "v-c-merchant-id: michaelkors_test_usd"; + } + + private String getPayload(){ + String bodyText = "{\n" + + " \"fields\": {\n" + + " \"paymentInformation\": {\n" + + " \"card\": {\n" + + " \"number\": {},\n" + + " \"securityCode\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"expirationMonth\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"expirationYear\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"type\": {\n" + + " \"required\": false\n" + + " }\n" + + " }\n" + + " },\n" + + " \"orderInformation\": {\n" + + " \"amountDetails\": {\n" + + " \"totalAmount\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"currency\": {\n" + + " \"required\": false\n" + + " }\n" + + " },\n" + + " \"billTo\": {\n" + + " \"address1\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"address2\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"administrativeArea\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"buildingNumber\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"country\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"district\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"locality\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"postalCode\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"email\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"firstName\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"lastName\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"phoneNumber\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"company\": {\n" + + " \"required\": false\n" + + " }\n" + + " },\n" + + " \"shipTo\": {\n" + + " \"address1\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"address2\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"administrativeArea\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"buildingNumber\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"country\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"district\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"locality\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"postalCode\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"firstName\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"lastName\": {\n" + + " \"required\": false\n" + + " },\n" + + " \"company\": {\n" + + " \"required\": false\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + return bodyText; + } +} From 12d7629f3029d6315d09a9a16516fb7d822c7ce2 Mon Sep 17 00:00:00 2001 From: kaushal Date: Sat, 25 Dec 2021 22:19:08 +0530 Subject: [PATCH 3/6] Bael-5212 Guide to Spectator --- .../java/com/baeldung/mono/MonoUnitTest.java | 57 +---- .../src/test/java/encryption/Encrypt.java | 208 ------------------ 2 files changed, 1 insertion(+), 264 deletions(-) delete mode 100644 reactor-core/src/test/java/encryption/Encrypt.java diff --git a/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java b/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java index 1a126466f886..fc42620e8e76 100644 --- a/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java +++ b/reactor-core/src/test/java/com/baeldung/mono/MonoUnitTest.java @@ -4,8 +4,6 @@ import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.time.Duration; @@ -13,8 +11,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.IntStream; -import java.util.stream.Stream; import static org.junit.Assert.assertEquals; @@ -73,7 +69,7 @@ private Flux monoTofluxUsingFlatMapIterable(Mono> monoList) { private Flux monoTofluxUsingFlatMapMany(Mono> monoList) { return monoList - .flatMapMany(it -> Flux.fromIterable(it)) + .flatMapMany(Flux::fromIterable) .log(); } @@ -163,57 +159,6 @@ private Mono sampleMsg(String str) { log.debug("Call to Retrieve Sample Message!! --> {} at: {}", str, System.currentTimeMillis()); return Mono.just(str); } - - @Test - public void testFluxDelay(){ - - Flux test = Flux.range(1,10) - .publishOn(Schedulers.parallel()) - .log() - .doOnNext(n -> log.debug("first... {}",n)) - .limitRate(4) - .doOnNext(n -> log.debug("second... {}",n)) -// .map(x -> { -// try { -// Thread.sleep(2000L); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// return x; -// }); -// .flatMap(i -> ) -// .delayElements(Duration.ofMillis(3000L), Schedulers.parallel()) - .doOnNext(n -> log.debug("third... {}",n)); -// .delaySubscription(Duration.ofMillis(5000L)); -// .delaySubscription(Flux.just(1,2,3).doOnNext(n -> log.debug("second {}",n)).log()); - - test.toStream().forEach(n -> log.debug("stream....{}",n)); - -// StepVerifier.create(test).expectNext(1,2,3,4,5,6,7,8,9,10).verifyComplete(); - - } - - @Test - public void delayedFlux(){ -// Flux delayedFlux = Flux.interval(Duration.ofSeconds(1)) - Flux delayedFlux = Flux.range(1,100) - .log() - .limitRate(4) - .log() - .map(n -> { - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return n; - }) -// .delayElements(Duration.ofMillis(1000L)) - .doOnNext(n -> log.debug("First...{}",n)); - - - delayedFlux.toStream().forEach(x -> log.debug("final...{}",x)); - } } diff --git a/reactor-core/src/test/java/encryption/Encrypt.java b/reactor-core/src/test/java/encryption/Encrypt.java deleted file mode 100644 index b55173cd2bc3..000000000000 --- a/reactor-core/src/test/java/encryption/Encrypt.java +++ /dev/null @@ -1,208 +0,0 @@ -package encryption; - -import lombok.extern.slf4j.Slf4j; -import org.junit.Test; - -import javax.crypto.Mac; -import javax.crypto.SecretKey; -import javax.crypto.spec.SecretKeySpec; -import java.nio.charset.StandardCharsets; -import java.security.InvalidKeyException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Base64; - -@Slf4j -public class Encrypt { - - @Test - public void generateHttpSignature() throws InvalidKeyException, NoSuchAlgorithmException { - String httpSignature = String.join(", ", - getSignatureTemplate("keyid", getMerchantKey()), - getSignatureTemplate("algorithm", "HmacSHA256"), - getSignatureTemplate("headers", "host date (request-target) digest v-c-merchant-id"), - getSignatureTemplate("signature", generateSignatureHash())); - - log.debug(httpSignature); - } - - private String getSignatureTemplate(String keyString, String valueString){ - return keyString+ "=\"" + valueString + "\""; - } - - private String getMerchantKey(){ - return "22f596a4-23ae-4031-bcbc-6f25dbfca91b"; - } - - private String generateSignatureHash() throws NoSuchAlgorithmException, InvalidKeyException { - String signatureParam = getSignatureParams(); - String keyString = getSharedSecret(); - byte[] decodedKey = Base64.getDecoder().decode(keyString); - SecretKey originalKey = new SecretKeySpec(decodedKey, 0, decodedKey.length, "HmacSHA256"); - Mac hmacSha256 = Mac.getInstance("HmacSHA256"); - hmacSha256.init(originalKey); - hmacSha256.update(signatureParam.getBytes()); - byte[] HmachSha256DigestBytes = hmacSha256.doFinal(); - return Base64.getEncoder().encodeToString(HmachSha256DigestBytes); - } - - private String getSharedSecret(){ - return "dfp630WMn73xEmd5KRa8/AZcbyLkoE3NHn3/SFAtmc8="; - } - - public String generateDigest() throws NoSuchAlgorithmException { - String payload = getPayload(); - MessageDigest md = MessageDigest.getInstance("SHA-256"); - md.update(payload.getBytes(StandardCharsets.UTF_8)); - byte[] digest = md.digest(); - String computedDigest = "SHA-256=" + Base64.getEncoder().encodeToString(digest); - log.debug("Computed Digest: {}", computedDigest); - return computedDigest; - } - - private String getSignatureParams() throws NoSuchAlgorithmException { - String customSignatureParam = String.join("\n", - getHost(), - getDate(), - getRequestTarget(), - getGeneratedDigest(), - getMerchantId()); - - return customSignatureParam; - } - - private String getHost(){ - return "host: apitest.cybersource.com"; - } - - private String getDate(){ - String date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)); - log.debug(date); - return "date: "+date; - } - private String getRequestTarget(){ - return "(request-target): post /flex/v2/sessions"; - } - - private String getGeneratedDigest() throws NoSuchAlgorithmException { - return "digest: "+ generateDigest(); - } - private String getMerchantId(){ - return "v-c-merchant-id: michaelkors_test_usd"; - } - - private String getPayload(){ - String bodyText = "{\n" + - " \"fields\": {\n" + - " \"paymentInformation\": {\n" + - " \"card\": {\n" + - " \"number\": {},\n" + - " \"securityCode\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"expirationMonth\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"expirationYear\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"type\": {\n" + - " \"required\": false\n" + - " }\n" + - " }\n" + - " },\n" + - " \"orderInformation\": {\n" + - " \"amountDetails\": {\n" + - " \"totalAmount\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"currency\": {\n" + - " \"required\": false\n" + - " }\n" + - " },\n" + - " \"billTo\": {\n" + - " \"address1\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"address2\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"administrativeArea\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"buildingNumber\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"country\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"district\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"locality\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"postalCode\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"email\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"firstName\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"lastName\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"phoneNumber\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"company\": {\n" + - " \"required\": false\n" + - " }\n" + - " },\n" + - " \"shipTo\": {\n" + - " \"address1\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"address2\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"administrativeArea\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"buildingNumber\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"country\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"district\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"locality\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"postalCode\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"firstName\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"lastName\": {\n" + - " \"required\": false\n" + - " },\n" + - " \"company\": {\n" + - " \"required\": false\n" + - " }\n" + - " }\n" + - " }\n" + - " }\n" + - "}"; - - return bodyText; - } -} From 042beb3de22288806d17d4eefffc7ae802804aeb Mon Sep 17 00:00:00 2001 From: kaushal Date: Sun, 26 Dec 2021 22:49:00 +0530 Subject: [PATCH 4/6] pom rollback --- metrics/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics/pom.xml b/metrics/pom.xml index 37b10ef4845c..49c2627f8c80 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -78,7 +78,7 @@ com.netflix.spectator spectator-api - 1.0.11 + 0.132.0 @@ -92,4 +92,4 @@ 1.1.0 - \ No newline at end of file + From de4333cbf1f951762d49c04c83aaeaac10470f06 Mon Sep 17 00:00:00 2001 From: kaushal Date: Sun, 26 Dec 2021 22:56:27 +0530 Subject: [PATCH 5/6] UnitTest Class name update --- .../{SpectatorMetersTest.java => SpectatorMetersUnitTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename metrics/src/test/java/com/baeldung/metrics/spectator/{SpectatorMetersTest.java => SpectatorMetersUnitTest.java} (99%) diff --git a/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java b/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersUnitTest.java similarity index 99% rename from metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java rename to metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersUnitTest.java index a73d6db608fd..d27ffec6afc3 100644 --- a/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersTest.java +++ b/metrics/src/test/java/com/baeldung/metrics/spectator/SpectatorMetersUnitTest.java @@ -12,7 +12,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -public class SpectatorMetersTest { +public class SpectatorMetersUnitTest { @Test public void spectatorCounterTest(){ From 0875da363e5e6df8b4ad8eba45818a484d0eac16 Mon Sep 17 00:00:00 2001 From: kaushal Date: Sun, 26 Dec 2021 23:01:24 +0530 Subject: [PATCH 6/6] Pom update --- metrics/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/pom.xml b/metrics/pom.xml index 49c2627f8c80..abdfb14dc6b7 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -78,7 +78,7 @@ com.netflix.spectator spectator-api - 0.132.0 + 1.0.11