diff --git a/.github/workflows/jacoco_report.yml b/.github/workflows/jacoco_report.yml index 0f3157b95..7f231b775 100644 --- a/.github/workflows/jacoco_report.yml +++ b/.github/workflows/jacoco_report.yml @@ -109,14 +109,14 @@ jobs: - name: Get the Coverage info if: steps.jacocorun.outcome == 'success' run: | - echo "Total agent module coverage ${{ steps.jacoco-agent.outputs.coverage-overall }}" - echo "Changed Files coverage ${{ steps.jacoco-agent.outputs.coverage-changed-files }}" - echo "Total agent module coverage ${{ steps.jacoco-reader.outputs.coverage-overall }}" - echo "Changed Files coverage ${{ steps.jacoco-reader.outputs.coverage-changed-files }}" - echo "Total model module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}" - echo "Changed Files coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}" - echo "Total server module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}" - echo "Changed Files coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}" + echo "Total 'agent' module coverage ${{ steps.jacoco-agent.outputs.coverage-overall }}" + echo "Changed files of 'agent' module coverage ${{ steps.jacoco-agent.outputs.coverage-changed-files }}" + echo "Total 'reader' module coverage ${{ steps.jacoco-reader.outputs.coverage-overall }}" + echo "Changed files of 'reader' module coverage ${{ steps.jacoco-reader.outputs.coverage-changed-files }}" + echo "Total 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}" + echo "Changed files of 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}" + echo "Total 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}" + echo "Changed files of 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}" - name: Fail PR if changed files coverage is less than ${{ env.coverage-changed-files }}% if: steps.jacocorun.outcome == 'success' uses: actions/github-script@v6 diff --git a/.github/workflows/test_filenames_check.yml b/.github/workflows/test_filenames_check.yml index 03d509cd1..b0cf8b7bf 100644 --- a/.github/workflows/test_filenames_check.yml +++ b/.github/workflows/test_filenames_check.yml @@ -41,6 +41,7 @@ jobs: excludes: | server/src/test/scala/za/co/absa/atum/server/api/TestData.scala, server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala, - server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala + server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala, + model/src/test/scala/za/co/absa/atum/testing/* verbose-logging: 'false' fail-on-violation: 'true' diff --git a/README.md b/README.md index 5703b5954..cf563701e 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,6 @@ We can even say, that `Checkpoint` is a result of particular `Measurements` (ver The journey of a dataset throughout various data transformations and pipelines. It captures the whole journey, even if it involves multiple applications or ETL pipelines. - ## Usage ### Atum Agent routines @@ -247,6 +246,7 @@ Code coverage wil be generated on path: To make this project runnable via IntelliJ, do the following: - Make sure that your configuration in `server/src/main/resources/reference.conf` is configured according to your needs +- When building within an IDE sure to have the option `-language:higherKinds` on in the compiler options, as it's often not picked up from the SBT project settings. ## How to Run Tests diff --git a/agent/README.md b/agent/README.md index 0d520106d..ed4247d2a 100644 --- a/agent/README.md +++ b/agent/README.md @@ -7,26 +7,26 @@ ## Usage -Create multiple `AtumContext` with different control measures to be applied +Create multiple `AtumContext` with different control measures to be applied ### Option 1 ```scala val atumContextInstanceWithRecordCount = AtumContext(processor = processor) - .withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id")) + .withMeasureAdded(RecordCount(MockMeasureNames.recordCount1)) val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount .withMeasureAdded(AbsSumOfValuesOfColumn(measuredColumn = "salary")) ``` -### Option 2 +### Option 2 Use `AtumPartitions` to get an `AtumContext` from the service using the `AtumAgent`. ```scala val atumContext1 = AtumAgent.createAtumContext(atumPartition) ``` #### AtumPartitions -A list of key values that maintains the order of arrival of the items, the `AtumService` -is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it. +A list of key values that maintains the order of arrival of the items, the `AtumService` +is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it. ```scala val atumPartitions = AtumPartitions().withPartitions(ListMap("name" -> "partition-name", "country" -> "SA", "gender" -> "female" )) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala index 32e4d9ec8..8e9dba60d 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala @@ -17,9 +17,10 @@ package za.co.absa.atum.agent import com.typesafe.config.{Config, ConfigFactory} -import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher} import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.types.basic.AtumPartitions +import za.co.absa.atum.model.types.basic.AtumPartitionsOps /** * Entity that communicate with the API, primarily focused on spawning Atum Context(s). @@ -58,7 +59,7 @@ trait AtumAgent { atumPartitions: AtumPartitions, additionalDataPatchDTO: AdditionalDataPatchDTO ): AdditionalDataDTO = { - dispatcher.updateAdditionalData(AtumPartitions.toSeqPartitionDTO(atumPartitions), additionalDataPatchDTO) + dispatcher.updateAdditionalData(atumPartitions.toPartitioningDTO, additionalDataPatchDTO) } /** @@ -75,7 +76,7 @@ trait AtumAgent { */ def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = { val authorIfNew = AtumAgent.currentUser - val partitioningDTO = PartitioningSubmitDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None, authorIfNew) + val partitioningDTO = PartitioningSubmitDTO(atumPartitions.toPartitioningDTO, None, authorIfNew) val atumContextDTO = dispatcher.createPartitioning(partitioningDTO) val atumContext = AtumContext.fromDTO(atumContextDTO, this) @@ -94,8 +95,8 @@ trait AtumAgent { val authorIfNew = AtumAgent.currentUser val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions - val newPartitionsDTO = AtumPartitions.toSeqPartitionDTO(newPartitions) - val parentPartitionsDTO = Some(AtumPartitions.toSeqPartitionDTO(parentAtumContext.atumPartitions)) + val newPartitionsDTO = newPartitions.toPartitioningDTO + val parentPartitionsDTO = Some(parentAtumContext.atumPartitions.toPartitioningDTO) val partitioningDTO = PartitioningSubmitDTO(newPartitionsDTO, parentPartitionsDTO, authorIfNew) val atumContextDTO = dispatcher.createPartitioning(partitioningDTO) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 66386b3be..f5f9a8591 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -17,14 +17,13 @@ package za.co.absa.atum.agent import org.apache.spark.sql.DataFrame -import za.co.absa.atum.agent.AtumContext.{AdditionalData, AtumPartitions} import za.co.absa.atum.agent.exception.AtumAgentException.PartitioningUpdateException import za.co.absa.atum.agent.model._ import za.co.absa.atum.model.dto._ +import za.co.absa.atum.model.types.basic.{AdditionalData, AtumPartitions, AtumPartitionsOps, PartitioningDTOOps} import java.time.ZonedDateTime import java.util.UUID -import scala.collection.immutable.ListMap /** * This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures. @@ -91,7 +90,7 @@ class AtumContext private[agent] ( name = checkpointName, author = agent.currentUser, measuredByAtumAgent = true, - partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions), + partitioning = atumPartitions.toPartitioningDTO, processStartTime = startTime, processEndTime = Some(endTime), measurements = measurementDTOs @@ -115,7 +114,7 @@ class AtumContext private[agent] ( id = UUID.randomUUID(), name = checkpointName, author = agent.currentUser, - partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions), + partitioning = atumPartitions.toPartitioningDTO, processStartTime = dateTimeNow, processEndTime = Some(dateTimeNow), measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements) @@ -206,36 +205,10 @@ class AtumContext private[agent] ( } object AtumContext { - /** - * Type alias for Atum partitions. - */ - type AtumPartitions = ListMap[String, String] - type AdditionalData = Map[String, Option[String]] - - /** - * Object contains helper methods to work with Atum partitions. - */ - object AtumPartitions { - def apply(elems: (String, String)): AtumPartitions = { - ListMap(elems) - } - - def apply(elems: List[(String, String)]): AtumPartitions = { - ListMap(elems:_*) - } - - private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = { - atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq - } - - private[agent] def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = { - AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList) - } - } private[agent] def fromDTO(atumContextDTO: AtumContextDTO, agent: AtumAgent): AtumContext = { new AtumContext( - AtumPartitions.fromPartitioning(atumContextDTO.partitioning), + atumContextDTO.partitioning.toAtumPartitions, agent, MeasuresBuilder.mapToMeasures(atumContextDTO.measures), atumContextDTO.additionalData diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentUnitTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentUnitTests.scala index 79613e91a..b2cb8c0ca 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentUnitTests.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumAgentUnitTests.scala @@ -18,8 +18,8 @@ package za.co.absa.atum.agent import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigValueFactory} import org.scalatest.funsuite.AnyFunSuiteLike -import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, HttpDispatcher} +import za.co.absa.atum.model.types.basic.AtumPartitions class AtumAgentUnitTests extends AnyFunSuiteLike { diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextUnitTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextUnitTests.scala index 75585f485..ba18377d2 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextUnitTests.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextUnitTests.scala @@ -22,11 +22,11 @@ import org.mockito.ArgumentCaptor import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn} import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure} import za.co.absa.atum.model.ResultValueType import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.model.types.basic._ class AtumContextUnitTests extends AnyFlatSpec with Matchers { @@ -95,12 +95,12 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers { val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) - - assert(argument.getValue.name == "testCheckpoint") - assert(argument.getValue.author == authorTest) - assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) - assert(argument.getValue.measurements.head.result.mainValue.value == "3") - assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue) + val value: CheckpointDTO = argument.getValue + assert(value.name == "testCheckpoint") + assert(value.author == authorTest) + assert(value.partitioning == atumPartitions.toPartitioningDTO) + assert(value.measurements.head.result.mainValue.value == "3") + assert(value.measurements.head.result.mainValue.valueType == ResultValueType.LongValue) } "createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in { @@ -123,13 +123,14 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers { val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) - - assert(argument.getValue.name == "name") - assert(argument.getValue.author == authorTest) - assert(!argument.getValue.measuredByAtumAgent) - assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) - assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get) - assert(argument.getValue.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)) + val value: CheckpointDTO = argument.getValue + + assert(value.name == "name") + assert(value.author == authorTest) + assert(!value.measuredByAtumAgent) + assert(value.partitioning == atumPartitions.toPartitioningDTO) + assert(value.processStartTime == value.processEndTime.get) + assert(value.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)) } "createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in { @@ -167,12 +168,13 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers { val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture()) + val valueFirst: CheckpointDTO = argumentFirst.getValue - assert(argumentFirst.getValue.name == "checkPointNameCount") - assert(argumentFirst.getValue.author == authorTest) - assert(argumentFirst.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) - assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4") - assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue) + assert(valueFirst.name == "checkPointNameCount") + assert(valueFirst.author == authorTest) + assert(valueFirst.partitioning == atumPartitions.toPartitioningDTO) + assert(valueFirst.measurements.head.result.mainValue.value == "4") + assert(valueFirst.measurements.head.result.mainValue.valueType == ResultValueType.LongValue) atumContext.addMeasure(SumOfValuesOfColumn("columnForSum")) when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user @@ -180,12 +182,13 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers { val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture()) + val valueSecond: CheckpointDTO = argumentSecond.getValue - assert(argumentSecond.getValue.name == "checkPointNameSum") - assert(argumentSecond.getValue.author == authorTest + "Another") - assert(argumentSecond.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) - assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5") - assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue) + assert(valueSecond.name == "checkPointNameSum") + assert(valueSecond.author == authorTest + "Another") + assert(valueSecond.partitioning == atumPartitions.toPartitioningDTO) + assert(valueSecond.measurements.tail.head.result.mainValue.value == "22.5") + assert(valueSecond.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue) } "addAdditionalData" should "add key/value pair to map for additional data" in { diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/AtumMeasureUnitTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/AtumMeasureUnitTests.scala index 7f2278f91..5c3ff2b88 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/AtumMeasureUnitTests.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/AtumMeasureUnitTests.scala @@ -21,9 +21,10 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumAgent -import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper} +import za.co.absa.atum.agent.AtumContext.DatasetWrapper import za.co.absa.atum.agent.model.AtumMeasure._ import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.model.types.basic.AtumPartitions import za.co.absa.spark.commons.test.SparkTestBase class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self => diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureUnitTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureUnitTests.scala index d96d0ac1e..fea11c9f9 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureUnitTests.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureUnitTests.scala @@ -19,11 +19,11 @@ package za.co.absa.atum.agent.model import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumAgent -import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.AtumMeasure.{AbsSumOfValuesOfColumn, RecordCount, SumOfHashesOfColumn, SumOfValuesOfColumn} import za.co.absa.spark.commons.test.SparkTestBase import za.co.absa.atum.agent.AtumContext._ import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.model.types.basic.AtumPartitions class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self => diff --git a/model/src/main/scala/za/co/absa/atum/model/envelopes/ErrorResponse.scala b/model/src/main/scala/za/co/absa/atum/model/envelopes/ErrorResponse.scala index 038018ac2..5b881c532 100644 --- a/model/src/main/scala/za/co/absa/atum/model/envelopes/ErrorResponse.scala +++ b/model/src/main/scala/za/co/absa/atum/model/envelopes/ErrorResponse.scala @@ -16,19 +16,26 @@ package za.co.absa.atum.model.envelopes +import io.circe.parser.decode import io.circe._ import io.circe.generic.semiauto._ import java.util.UUID object ErrorResponse { - implicit val decodeErrorResponse: Decoder[ErrorResponse] = deriveDecoder - implicit val encodeErrorResponse: Encoder[ErrorResponse] = deriveEncoder + def basedOnStatusCode(statusCode: Int, jsonString: String): Either[Error, ErrorResponse] = { + statusCode match { + case 400 => decode[BadRequestResponse](jsonString) + case 401 => decode[UnauthorizedErrorResponse](jsonString) + case 404 => decode[NotFoundErrorResponse](jsonString) + case 409 => decode[ConflictErrorResponse](jsonString) + case 500 => decode[InternalServerErrorResponse](jsonString) + case _ => decode[GeneralErrorResponse](jsonString) + } + } } -sealed trait ErrorResponse extends ResponseEnvelope { - def message: String -} +sealed trait ErrorResponse extends ResponseEnvelope final case class BadRequestResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse @@ -71,3 +78,11 @@ object ErrorInDataErrorResponse { implicit val decoderInternalServerErrorResponse: Decoder[ErrorInDataErrorResponse] = deriveDecoder implicit val encoderInternalServerErrorResponse: Encoder[ErrorInDataErrorResponse] = deriveEncoder } + +final case class UnauthorizedErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse + +object UnauthorizedErrorResponse { + implicit val decoderInternalServerErrorResponse: Decoder[UnauthorizedErrorResponse] = deriveDecoder + implicit val encoderInternalServerErrorResponse: Encoder[UnauthorizedErrorResponse] = deriveEncoder +} + diff --git a/model/src/main/scala/za/co/absa/atum/model/types/basic.scala b/model/src/main/scala/za/co/absa/atum/model/types/basic.scala new file mode 100644 index 000000000..2631f243c --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/types/basic.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.types + +import scala.collection.immutable.ListMap +import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningDTO} + + +object basic { + /** + * Type alias for Atum partitions. + */ + type AtumPartitions = ListMap[String, String] + type AdditionalData = Map[String, Option[String]] + + /** + * Object contains helper methods to work with Atum partitions. + */ + object AtumPartitions { + def apply(elems: (String, String)): AtumPartitions = { + ListMap(elems) + } + + def apply(elems: List[(String, String)]): AtumPartitions = { + ListMap(elems:_*) + } + + } + + implicit class AtumPartitionsOps(val atumPartitions: AtumPartitions) extends AnyVal { + def toPartitioningDTO: PartitioningDTO = { + atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq + } + } + + implicit class PartitioningDTOOps(val partitioning: PartitioningDTO) extends AnyVal { + def toAtumPartitions: AtumPartitions = { + AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList) + } + } + +} diff --git a/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala b/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala index c892138e7..3f7b7457f 100644 --- a/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala +++ b/model/src/main/scala/za/co/absa/atum/model/utils/JsonSyntaxExtensions.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.model.utils import io.circe.parser.decode import io.circe.syntax._ -import io.circe.{Decoder, Encoder, parser} +import io.circe.{Decoder, Encoder} import java.util.Base64 @@ -36,16 +36,20 @@ object JsonSyntaxExtensions { implicit class JsonDeserializationSyntax(jsonStr: String) { def as[T: Decoder]: T = { - decode[T](jsonStr) match { + asSafe[T] match { case Right(value) => value - case Left(error) => throw new RuntimeException(s"Failed to decode JSON: $error") + case Left(error) => throw error } } + private def asSafe[T: Decoder]: Either[io.circe.Error, T] = { + decode[T](jsonStr) + } + def fromBase64As[T: Decoder]: Either[io.circe.Error, T] = { val decodedBytes = Base64.getDecoder.decode(jsonStr) val decodedString = new String(decodedBytes, "UTF-8") - parser.decode[T](decodedString) + decode[T](decodedString) } } diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/dto/SerializationUtilsUnitTests.scala similarity index 97% rename from model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsUnitTests.scala rename to model/src/test/scala/za/co/absa/atum/model/dto/SerializationUtilsUnitTests.scala index 729392934..045cb7e3b 100644 --- a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsUnitTests.scala +++ b/model/src/test/scala/za/co/absa/atum/model/dto/SerializationUtilsUnitTests.scala @@ -14,14 +14,13 @@ * limitations under the License. */ -package za.co.absa.atum.model.utils +package za.co.absa.atum.model.dto import org.scalatest.flatspec.AnyFlatSpecLike import za.co.absa.atum.model.ResultValueType import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue -import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.utils.JsonSyntaxExtensions._ -import za.co.absa.atum.model.utils.SerializationUtilsTest.StringLinearization +import za.co.absa.atum.testing.implicits.StringImplicits.StringLinearization import java.time.{ZoneId, ZoneOffset, ZonedDateTime} import java.util.UUID @@ -436,11 +435,3 @@ class SerializationUtilsUnitTests extends AnyFlatSpecLike { } } - -object SerializationUtilsTest { - implicit class StringLinearization(val str: String) extends AnyVal { - def linearize: String = { - str.stripMargin.replace("\r", "").replace("\n", "") - } - } -} diff --git a/model/src/test/scala/za/co/absa/atum/model/envelopes/ErrorResponseUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/envelopes/ErrorResponseUnitTests.scala new file mode 100644 index 000000000..b14c3de7a --- /dev/null +++ b/model/src/test/scala/za/co/absa/atum/model/envelopes/ErrorResponseUnitTests.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.envelopes + +import io.circe.ParsingFailure +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax + +import java.util.UUID + +class ErrorResponseUnitTests extends AnyFunSuiteLike { + test("ErrorResponse.basedOnStatusCode should return correct error response on `Bad Request`") { + val originalError = BadRequestResponse("Bad Request", UUID.randomUUID()) + val errorResponse = ErrorResponse.basedOnStatusCode(400, originalError.asJsonString) + assert(errorResponse == Right(originalError)) + } + + test("ErrorResponse.basedOnStatusCode should return correct error response on `Unauthorized`") { + val originalError = UnauthorizedErrorResponse("Unauthorized", UUID.randomUUID()) + val errorResponse = ErrorResponse.basedOnStatusCode(401, originalError.asJsonString) + assert(errorResponse == Right(originalError)) + } + + test("ErrorResponse.basedOnStatusCode should return correct error response on `Not Found`") { + val originalError = NotFoundErrorResponse("Not Found", UUID.randomUUID()) + val errorResponse = ErrorResponse.basedOnStatusCode(404, originalError.asJsonString) + assert(errorResponse == Right(originalError)) + } + + test("ErrorResponse.basedOnStatusCode should return correct error response on `Conflict`") { + val originalError = ConflictErrorResponse("Conflict", UUID.randomUUID()) + val errorResponse = ErrorResponse.basedOnStatusCode(409, originalError.asJsonString) + assert(errorResponse == Right(originalError)) + } + + test("ErrorResponse.basedOnStatusCode should return correct error response on `Internal Server Error`") { + val originalError = InternalServerErrorResponse("Internal Server Error", UUID.randomUUID()) + val errorResponse = ErrorResponse.basedOnStatusCode(500, originalError.asJsonString) + assert(errorResponse == Right(originalError)) + } + + test("ErrorResponse.basedOnStatusCode should return GeneralErrorResponse on unknown status code") { + val originalError = GeneralErrorResponse("Heluva", UUID.randomUUID()) + val errorResponse = ErrorResponse.basedOnStatusCode(600, originalError.asJsonString) + assert(errorResponse == Right(originalError)) + } + + test("ErrorResponse.basedOnStatusCode fails on invalid JSON") { + val message = "This is not a JSON" + val errorResponse = ErrorResponse.basedOnStatusCode(400, message) + assert(errorResponse.isLeft) + errorResponse.swap.foreach{e => + // investigate the error + assert(e.isInstanceOf[ParsingFailure]) + } + } + +} diff --git a/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala new file mode 100644 index 000000000..11a529d02 --- /dev/null +++ b/model/src/test/scala/za/co/absa/atum/model/types/AtumPartitionsUnitTests.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.types + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.atum.model.dto.PartitionDTO +import za.co.absa.atum.model.types.basic.AtumPartitions + +import scala.collection.immutable.ListMap + + + +class AtumPartitionsUnitTests extends AnyFunSuiteLike { + test("Creating AtumPartitions from one pair of key-value") { + val expected = ListMap("a" -> "b") + val result = AtumPartitions(("a", "b")) + assert(result == expected) + } + + test("Creating AtumPartitions from multiple key-value pairs") { + val expected = ListMap( + "a" -> "b", + "e" -> "Hello", + "c" -> "d" + ) + val result = AtumPartitions(List( + ("a", "b"), + ("e", "Hello"), + ("c", "d") + )) + assert(result == expected) + assert(result.head == ("a", "b")) + } + + test("Conversion to PartitioningDTO returns expected result") { + import za.co.absa.atum.model.types.basic.AtumPartitionsOps + + val atumPartitions = AtumPartitions(List( + ("a", "b"), + ("e", "Hello"), + ("c", "d") + )) + + val expected = Seq( + PartitionDTO("a", "b"), + PartitionDTO("e", "Hello"), + PartitionDTO("c", "d") + ) + + assert(atumPartitions.toPartitioningDTO == expected) + } + + test("Creating AtumPartitions from PartitioningDTO") { + import za.co.absa.atum.model.types.basic.PartitioningDTOOps + + val partitionDTO = Seq( + PartitionDTO("a", "b"), + PartitionDTO("e", "Hello"), + PartitionDTO("c", "d") + ) + + val expected = AtumPartitions(List( + ("a", "b"), + ("e", "Hello"), + ("c", "d") + )) + + assert(partitionDTO.toAtumPartitions == expected) + } + +} diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala new file mode 100644 index 000000000..4ca1ac9df --- /dev/null +++ b/model/src/test/scala/za/co/absa/atum/model/utils/JsonDeserializationSyntaxUnitTests.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.utils + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.atum.model.dto.{FlowDTO, PartitionDTO} +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonDeserializationSyntax + +class JsonDeserializationSyntaxUnitTests extends AnyFunSuiteLike { + test("Decode object from Json with defined Option field") { + val source = + """{ + | "id": 1, + | "name": "Test flow", + | "description": "Having description", + | "fromPattern": false + |}""".stripMargin + val result = source.as[FlowDTO] + val expected = FlowDTO( + id = 1, + name = "Test flow", + description = Some("Having description"), + fromPattern = false + ) + assert(result == expected) + } + + test("Decode object from Json with Option field undefined") { + val source = + """{ + | "id": 1, + | "name": "Test flow", + | "fromPattern": true + |}""".stripMargin + val result = source.as[FlowDTO] + val expected = FlowDTO( + id = 1, + name = "Test flow", + description = None, + fromPattern = true + ) + assert(result == expected) + } + + test("Fail when input is not Json") { + val source = "This is not a Json!" + intercept[io.circe.Error] { + source.as[FlowDTO] + } + } + + test("Fail when given wrong class") { + val source = + """{ + | "id": 1, + | "name": "Test flow", + | "description": "Having description", + | "fromPattern": false + |}""".stripMargin + intercept[io.circe.Error] { + source.as[PartitionDTO] + } + } + + + test("Decode object from Base64 string") { + val source = "eyJpZCI6MSwibmFtZSI6IlRlc3QgZmxvdyIsImRlc2NyaXB0aW9uIjpudWxsLCJmcm9tUGF0dGVybiI6ZmFsc2V9" + val result = source.fromBase64As[FlowDTO] + val expected = FlowDTO( + id = 1, + name = "Test flow", + description = None, + fromPattern = false + ) + assert(result == Right(expected)) + } + + test("Failing decode if not Base64 string") { + val source = "" + val result = source.fromBase64As[FlowDTO] + assert(result.isLeft) + } + +} diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala b/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala new file mode 100644 index 000000000..830f4e9b7 --- /dev/null +++ b/model/src/test/scala/za/co/absa/atum/model/utils/JsonSerializationSyntaxUnitTests.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.utils + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.atum.model.dto.FlowDTO +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax + +class JsonSerializationSyntaxUnitTests extends AnyFunSuiteLike { + test("Converting to Json with option field defined") { + val expected = """{"id":1,"name":"Test flow","description":"Having description","fromPattern":false}""" + val result = FlowDTO( + id = 1, + name = "Test flow", + description = Some("Having description"), + fromPattern = false + ).asJsonString + assert(result == expected) + } + + test("Converting to Json with option field undefined") { + val expected = """{"id":1,"name":"Test flow","description":null,"fromPattern":true}""" + val result = FlowDTO( + id = 1, + name = "Test flow", + description = None, + fromPattern = true + ).asJsonString + assert(result == expected) + } + + test("Converting to Base64") { + val expected = "eyJpZCI6MSwibmFtZSI6IlRlc3QgZmxvdyIsImRlc2NyaXB0aW9uIjpudWxsLCJmcm9tUGF0dGVybiI6ZmFsc2V9" + val result = FlowDTO( + id = 1, + name = "Test flow", + description = None, + fromPattern = false + ).asBase64EncodedJsonString + assert(result == expected) + } + +} diff --git a/model/src/test/scala/za/co/absa/atum/testing/implicits/StringImplicits.scala b/model/src/test/scala/za/co/absa/atum/testing/implicits/StringImplicits.scala new file mode 100644 index 000000000..c513cb77e --- /dev/null +++ b/model/src/test/scala/za/co/absa/atum/testing/implicits/StringImplicits.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.testing.implicits + +object StringImplicits { + implicit class StringLinearization(val str: String) extends AnyVal { + def linearize: String = { + str.stripMargin.replace("\r", "").replace("\n", "") + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e9783300e..3538c0e62 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,7 +38,7 @@ object Dependencies { val sparkCommons = "0.6.1" - val sttpClient = "3.5.2" + val sttpClient = "3.5.2" //last supported version for Java 8 val sttpCirceJson = "3.9.7" val postgresql = "42.6.0" @@ -64,6 +64,8 @@ object Dependencies { val absaCommons = "2.0.0" + val catsEffect = "3.3.14" + def truncateVersion(version: String, parts: Int): String = { version.split("\\.").take(parts).mkString(".") } @@ -90,9 +92,9 @@ object Dependencies { private def jsonSerdeDependencies: Seq[ModuleID] = { // Circe dependencies - lazy val circeCore = "io.circe" %% "circe-core" % Versions.circeJson - lazy val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson - lazy val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson + val circeCore = "io.circe" %% "circe-core" % Versions.circeJson + val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson + val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson Seq( circeCore, @@ -236,9 +238,26 @@ object Dependencies { } def readerDependencies(scalaVersion: Version): Seq[ModuleID] = { + val sbtOrg = "com.github.sbt" + val sttpClient3Org = "com.softwaremill.sttp.client3" + val typeLevelOrg = "org.typelevel" + + // STTP core and Circe integration + val sttpCore = sttpClient3Org %% "core" % Versions.sttpClient + val sttpCirce = sttpClient3Org %% "circe" % Versions.sttpClient + + // Cats backend + val catsEffect = typeLevelOrg %% "cats-effect" % Versions.catsEffect % Optional + val sttpCats = sttpClient3Org %% "cats" % Versions.sttpClient % Optional + Seq( + sttpCore, + sttpCirce, + sttpCats, + catsEffect ) ++ - testDependencies + testDependencies ++ + jsonSerdeDependencies } def databaseDependencies: Seq[ModuleID] = { diff --git a/project/JacocoSetup.scala b/project/JacocoSetup.scala index 635ea276d..4afcc950a 100644 --- a/project/JacocoSetup.scala +++ b/project/JacocoSetup.scala @@ -51,8 +51,10 @@ object JacocoSetup { "za.co.absa.atum.server.Constants*", "za.co.absa.atum.server.api.database.DoobieImplicits*", "za.co.absa.atum.server.api.database.TransactorProvider*", - "za.co.absa.atum.model.dto.*", - "za.co.absa.atum.model.envelopes.*" + "za.co.absa.atum.model.envelopes.Pagination", + "za.co.absa.atum.model.envelopes.ResponseEnvelope", + "za.co.absa.atum.model.envelopes.StatusResponse", + "za.co.absa.atum.model.envelopes.SuccessResponse" ) } diff --git a/project/Setup.scala b/project/Setup.scala index 319a06bcd..f986192f9 100644 --- a/project/Setup.scala +++ b/project/Setup.scala @@ -29,7 +29,7 @@ object Setup { val recommendedJava: Double = "11".toDouble val currentJava: Double = sys.props("java.specification.version").toDouble - //supported Scala versions + //possible supported Scala versions val scala211: Version = Version.asSemVer("2.11.12") val scala212: Version = Version.asSemVer("2.12.18") val scala213: Version = Version.asSemVer("2.13.11") @@ -43,19 +43,42 @@ object Setup { ) val serverAndDbScalaVersion: Version = scala213 //covers REST server and database modules - val clientSupportedScalaVersions: Seq[Version] = Seq(scala212, scala213) + val clientSupportedScalaVersions: Seq[Version] = Seq( + scala212, + scala213, + ) - val commonScalacOptions: Seq[String] = Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings") + val commonScalacOptions: Seq[String] = Seq( + "-unchecked", + "-deprecation", + "-feature", + "-Xfatal-warnings" + ) - val serverAndDbJavacOptions: Seq[String] = Seq("-source", "11", "-target", "11", "-Xlint") - val serverAndDbScalacOptions: Seq[String] = Seq("-Ymacro-annotations") + val serverAndDbJavacOptions: Seq[String] = Seq( + "-source", "11", + "-target", "11", + "-Xlint" + ) + val serverAndDbScalacOptions: Seq[String] = Seq( + "-language:higherKinds", + "-Ymacro-annotations" + ) val clientJavacOptions: Seq[String] = Seq("-source", "1.8", "-target", "1.8", "-Xlint") def clientScalacOptions(scalaVersion: Version): Seq[String] = { if (scalaVersion >= scala213) { - Seq("-release", "8", "-Ymacro-annotations") + Seq( + "-release", "8", + "-language:higherKinds", + "-Ymacro-annotations" + ) } else { - Seq("-release", "8", "-target:8") + Seq( + "-release", "8", + "-language:higherKinds", + "-target:8" + ) } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala index 6c45d504e..6a99bbe40 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala @@ -16,9 +16,23 @@ package za.co.absa.atum.reader -class FlowReader { - def foo(): String = { - // just to have some testable content - "bar" - } +import sttp.client3.SttpBackend +import sttp.monad.MonadError +import za.co.absa.atum.model.types.basic.AtumPartitions +import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader} +import za.co.absa.atum.reader.server.ServerConfig + +/** + * This class is a reader that reads data tight to a flow. + * @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning' + * @param serverConfig - the Atum server configuration + * @param backend - sttp backend, that will be executing the requests + * @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context + * bounds, as it make the imports easier to follow + * @tparam F - the effect type (e.g. Future, IO, Task, etc.) + */ +class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions) + (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) + extends Reader[F] with PartitioningIdProvider[F]{ + } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index d1153e4b5..f103605b6 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -16,7 +16,24 @@ package za.co.absa.atum.reader -class PartitioningReader { +import sttp.client3.SttpBackend +import sttp.monad.MonadError +import za.co.absa.atum.model.types.basic.AtumPartitions +import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader} +import za.co.absa.atum.reader.server.ServerConfig + +/** + * + * @param partitioning - the Atum partitions to read the information from + * @param serverConfig - the Atum server configuration + * @param backend - sttp backend, that will be executing the requests + * @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context + * bounds, as it make the imports easier to follow + * @tparam F - the effect type (e.g. Future, IO, Task, etc.) + */ +case class PartitioningReader[F[_]](partitioning: AtumPartitions) + (implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) + extends Reader[F] with PartitioningIdProvider[F]{ def foo(): String = { // just to have some testable content "bar" diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala new file mode 100644 index 000000000..8a802ff02 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/basic/PartitioningIdProvider.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import sttp.monad.MonadError +import sttp.monad.syntax._ +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.model.types.basic.AtumPartitions +import za.co.absa.atum.model.types.basic.AtumPartitionsOps +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax +import za.co.absa.atum.reader.basic.RequestResult.RequestResult + +trait PartitioningIdProvider[F[_]] {self: Reader[F] => + def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = { + val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString + val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning)) + queryResult.map{result => + result.map(_.data.id) + } + } +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala b/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala new file mode 100644 index 000000000..325f8c6fe --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import io.circe.Decoder +import sttp.client3.{Identity, RequestT, ResponseException, SttpBackend, basicRequest} +import sttp.client3.circe.asJson +import sttp.model.Uri +import sttp.monad.MonadError +import sttp.monad.syntax._ +import za.co.absa.atum.reader.server.ServerConfig +import za.co.absa.atum.reader.basic.RequestResult._ + +/** + * Reader is a base class for reading data from a remote server. + * @param serverConfig - the configuration how to reach the Atum server + * @param backend - sttp backend to use to send requests + * @tparam F - the monadic effect used to get the data (e.g. Future, IO, Task, etc.) + * the context bind for the F type is MonadError to allow not just map, flatMap but eventually + * also error handling easily on a higher level + */ +abstract class Reader[F[_]: MonadError](implicit val serverConfig: ServerConfig, val backend: SttpBackend[F, Any]) { + + protected def getQuery[R: Decoder](endpointUri: String, params: Map[String, String] = Map.empty): F[RequestResult[R]] = { + val endpointToQuery = serverConfig.host + endpointUri + val uri = Uri.unsafeParse(endpointToQuery).addParams(params) + val request: RequestT[Identity, Either[ResponseException[String, CirceError], R], Any] = basicRequest + .get(uri) + .response(asJson[R]) + + val response = backend.send(request) + + response.map(_.toRequestResult) + } +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/RequestResult.scala b/reader/src/main/scala/za/co/absa/atum/reader/basic/RequestResult.scala new file mode 100644 index 000000000..76e8cbfa9 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/basic/RequestResult.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import sttp.client3.{DeserializationException, HttpError, Response, ResponseException} +import za.co.absa.atum.model.envelopes.ErrorResponse + +object RequestResult { + type CirceError = io.circe.Error + type RequestResult[R] = Either[ResponseException[ErrorResponse, CirceError], R] + + implicit class ResponseOps[R](val response: Response[Either[ResponseException[String, CirceError], R]]) extends AnyVal { + def toRequestResult: RequestResult[R] = { + response.body.left.map { + case he: HttpError[String] => + ErrorResponse.basedOnStatusCode(he.statusCode.code, he.body) match { + case Right(er) => HttpError(er, he.statusCode) + case Left(ce) => DeserializationException(he.body, ce) + } + case de: DeserializationException[CirceError] => de + } + } + } +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/implicits/future.scala b/reader/src/main/scala/za/co/absa/atum/reader/implicits/future.scala new file mode 100644 index 000000000..23f6c0f80 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/implicits/future.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.implicits + +import sttp.monad.{FutureMonad => SttpFutureMonad} + +import scala.concurrent.ExecutionContext.Implicits.global + +object future { + final implicit val futureMonadError: SttpFutureMonad = new SttpFutureMonad +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/implicits/io.scala b/reader/src/main/scala/za/co/absa/atum/reader/implicits/io.scala new file mode 100644 index 000000000..96a148e13 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/implicits/io.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.implicits + +import cats.effect.IO +import sttp.client3.impl.cats.CatsMonadAsyncError + +object io { + final implicit val catsIOMonadError: CatsMonadAsyncError[IO] = new CatsMonadAsyncError[IO] +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/server/ServerConfig.scala b/reader/src/main/scala/za/co/absa/atum/reader/server/ServerConfig.scala new file mode 100644 index 000000000..92e90eb8e --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/server/ServerConfig.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.server + +import com.typesafe.config.{Config, ConfigFactory} + +/** + * A case class representing the configuration to connect to an Atum server instance. + * @param host The URL of the Atum server instance. + */ +case class ServerConfig (host: String) + +object ServerConfig { + final val HostKey = "atum.server.url" + + def fromConfig(config: Config = ConfigFactory.load()): ServerConfig = { + ServerConfig(config.getString(HostKey)) + } +} diff --git a/reader/src/test/resources/reference.conf b/reader/src/test/resources/reference.conf new file mode 100644 index 000000000..4357da397 --- /dev/null +++ b/reader/src/test/resources/reference.conf @@ -0,0 +1,15 @@ +# Copyright 2021 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The REST API URI of the atum server +atum.server.url="http://localhost:8080" diff --git a/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala index 3800801a2..3a85f6124 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala @@ -17,10 +17,25 @@ package za.co.absa.atum.reader import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.client3.SttpBackend +import sttp.client3.testing.SttpBackendStub +import za.co.absa.atum.model.types.basic.AtumPartitions +import za.co.absa.atum.reader.server.ServerConfig +import za.co.absa.atum.reader.implicits.future.futureMonadError + +import scala.concurrent.Future class FlowReaderUnitTests extends AnyFunSuiteLike { - test("foo") { - val expected = new FlowReader().foo() - assert(expected == "bar") + private implicit val serverConfig: ServerConfig = ServerConfig.fromConfig() + + test("mainFlowPartitioning is the same as partitioning") { + val atumPartitions: AtumPartitions = AtumPartitions(List( + "a" -> "b", + "c" -> "d" + )) + implicit val server: SttpBackend[Future, Any] = SttpBackendStub.asynchronousFuture + + val result = new FlowReader(atumPartitions).mainFlowPartitioning + assert(result == atumPartitions) } } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala index c4221d8c5..1a5d21a01 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala @@ -17,10 +17,26 @@ package za.co.absa.atum.reader import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.client3.SttpBackend +import sttp.client3.testing.SttpBackendStub +import za.co.absa.atum.model.types.basic.AtumPartitions +import za.co.absa.atum.reader.server.ServerConfig +import za.co.absa.atum.reader.implicits.future.futureMonadError + +import scala.concurrent.Future + + class PartitioningReaderUnitTests extends AnyFunSuiteLike { + private implicit val serverConfig: ServerConfig = ServerConfig.fromConfig() + test("foo") { - val expected = new PartitioningReader().foo() - assert(expected == "bar") + val atumPartitions: AtumPartitions = AtumPartitions(List( + "a" -> "b", + "c" -> "d" + )) + implicit val server: SttpBackend[Future, Any] = SttpBackendStub.asynchronousFuture + val result = PartitioningReader(atumPartitions).foo() + assert(result == "bar") } } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/PartitioningIdProviderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/basic/PartitioningIdProviderUnitTests.scala new file mode 100644 index 000000000..20cac1a02 --- /dev/null +++ b/reader/src/test/scala/za/co/absa/atum/reader/basic/PartitioningIdProviderUnitTests.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.capabilities +import sttp.client3._ +import sttp.client3.monad.IdMonad +import sttp.client3.testing.SttpBackendStub +import sttp.model._ +import sttp.monad.MonadError +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.model.envelopes.NotFoundErrorResponse +import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.model.types.basic.{AtumPartitions, AtumPartitionsOps} +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax +import za.co.absa.atum.reader.basic.RequestResult._ +import za.co.absa.atum.reader.server.ServerConfig + +class PartitioningIdProviderUnitTests extends AnyFunSuiteLike { + private val serverUrl = "http://localhost:8080" + private val atumPartitionsToReply = AtumPartitions("a", "b") + private val atumPartitionsToFailedDecode = AtumPartitions("c", "d") + private val atumPartitionsToNotFound = AtumPartitions(List.empty) + + private implicit val serverConfig: ServerConfig = ServerConfig(serverUrl) + private implicit val monad: IdMonad.type = IdMonad + private implicit val server: SttpBackendStub[Identity, capabilities.WebSockets] = SttpBackendStub.synchronous + .whenRequestMatches(request => isUriOfAtumPartitions(request.uri, atumPartitionsToReply)) + .thenRespond(SingleSuccessResponse(PartitioningWithIdDTO(1, atumPartitionsToReply.toPartitioningDTO, "Gimli")).asJsonString) + .whenRequestMatches(request => isUriOfAtumPartitions(request.uri, atumPartitionsToFailedDecode)) + .thenRespond("This is not a correct JSON") + .whenRequestMatches(request => isUriOfAtumPartitions(request.uri, atumPartitionsToNotFound)) + .thenRespond(NotFoundErrorResponse("Partitioning not found").asJsonString, StatusCode.NotFound) + + private def isUriOfAtumPartitions(uri: Uri, atumPartitions: AtumPartitions): Boolean = { + val encodedPartitions = atumPartitions.toPartitioningDTO.asBase64EncodedJsonString + val targetUri = uri"$serverUrl/api/v2/partitionings?partitioning=$encodedPartitions" + uri == targetUri + } + + + private case class ReaderWithPartitioningIdForTest(partitioning: AtumPartitions) + (implicit serverConfig: ServerConfig) + extends Reader[Identity] with PartitioningIdProvider[Identity]{ + + override def partitioningId(partitioning: AtumPartitions) + (implicit monad: MonadError[Identity]): Identity[RequestResult[Long]] = + super.partitioningId(partitioning) + } + + + test("Gets the partitioning id") { + val reader = ReaderWithPartitioningIdForTest(atumPartitionsToReply) + val response = reader.partitioningId(atumPartitionsToReply) + val result: Long = response.getOrElse(throw new Exception("Failed to get partitioning id")) + assert(result == 1) + } + + test("Not found on the partitioning id") { + val reader = ReaderWithPartitioningIdForTest(atumPartitionsToNotFound) + val result = reader.partitioningId(atumPartitionsToNotFound) + result match { + case Right(_) => fail("Expected a failure, but OK response received") + case Left(_: DeserializationException[CirceError]) => fail("Expected a not found response, but deserialization error received") + case Left(x: HttpError[_]) => + assert(x.body.isInstanceOf[NotFoundErrorResponse]) + assert(x.statusCode == StatusCode.NotFound) + case _ => fail("Unexpected response") + } + } + + test("Failure to decode response body") { + val reader = ReaderWithPartitioningIdForTest(atumPartitionsToFailedDecode) + val result = reader.partitioningId(atumPartitionsToFailedDecode) + assert(result.isLeft) + result.swap.map(e => assert(e.isInstanceOf[DeserializationException[CirceError]])) + } +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_CatsIOUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_CatsIOUnitTests.scala new file mode 100644 index 000000000..1aaad0901 --- /dev/null +++ b/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_CatsIOUnitTests.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import cats.effect.unsafe.implicits.global +import io.circe.Decoder +import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.client3.SttpBackend +import sttp.client3.testing.SttpBackendStub +import sttp.monad.{MonadAsyncError, MonadError} +import za.co.absa.atum.model.dto.PartitionDTO +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax +import za.co.absa.atum.reader.basic.RequestResult.RequestResult +import za.co.absa.atum.reader.server.ServerConfig + +class Reader_CatsIOUnitTests extends AnyFunSuiteLike { + private implicit val serverConfig: ServerConfig = ServerConfig("http://localhost:8080") + + private class ReaderForTest[F[_]](implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) + extends Reader { + override def getQuery[R: Decoder](endpointUri: String, params: Map[String, String]): F[RequestResult[R]] = super.getQuery(endpointUri, params) + } + + test("Using Cats IO based backend") { + import cats.effect.IO + import za.co.absa.atum.reader.implicits.io.catsIOMonadError + + val partitionDTO = PartitionDTO("someKey", "someValue") + implicit val server: SttpBackendStub[IO, Any] = SttpBackendStub[IO, Any](implicitly[MonadAsyncError[IO]]) + .whenAnyRequest.thenRespond(partitionDTO.asJsonString) + + val reader = new ReaderForTest + val query = reader.getQuery[PartitionDTO]("/test", Map.empty) + val result = query.unsafeRunSync() + assert(result == Right(partitionDTO)) + } + +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_FutureUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_FutureUnitTests.scala new file mode 100644 index 000000000..c19c6411d --- /dev/null +++ b/reader/src/test/scala/za/co/absa/atum/reader/basic/Reader_FutureUnitTests.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import io.circe.Decoder +import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.client3.SttpBackend +import sttp.client3.testing.SttpBackendStub +import sttp.monad.MonadError +import za.co.absa.atum.model.dto.PartitionDTO +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax +import za.co.absa.atum.reader.basic.RequestResult.RequestResult +import za.co.absa.atum.reader.server.ServerConfig + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +class Reader_FutureUnitTests extends AnyFunSuiteLike { + private implicit val serverConfig: ServerConfig = ServerConfig("http://localhost:8080") + + private class ReaderForTest[F[_]](implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) + extends Reader { + override def getQuery[R: Decoder](endpointUri: String, params: Map[String, String]): F[RequestResult[R]] = super.getQuery(endpointUri, params) + } + + test("Using Future based backend") { + import za.co.absa.atum.reader.implicits.future.futureMonadError + + val partitionDTO = PartitionDTO("someKey", "someValue") + implicit val server: SttpBackend[Future, Any] = SttpBackendStub.asynchronousFuture + .whenAnyRequest.thenRespond(partitionDTO.asJsonString) + + val reader = new ReaderForTest + val resultToBe = reader.getQuery[PartitionDTO]("/test", Map.empty) + val result = Await.result(resultToBe, Duration(3, "second")) + assert(result == Right(partitionDTO)) + } + +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/basic/RequestResultUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/basic/RequestResultUnitTests.scala new file mode 100644 index 000000000..d181154df --- /dev/null +++ b/reader/src/test/scala/za/co/absa/atum/reader/basic/RequestResultUnitTests.scala @@ -0,0 +1,83 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import io.circe.ParsingFailure +import org.scalatest.funsuite.AnyFunSuiteLike +import sttp.client3.{DeserializationException, HttpError, Response, ResponseException} +import sttp.model.StatusCode +import za.co.absa.atum.model.dto.PartitionDTO +import za.co.absa.atum.model.envelopes.NotFoundErrorResponse +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax +import za.co.absa.atum.reader.basic.RequestResult._ + +class RequestResultUnitTests extends AnyFunSuiteLike { + test("Response.toRequestResult keeps the right value") { + val partitionDTO = PartitionDTO("someKey", "someValue") + val body = Right(partitionDTO) + val source: Response[Either[ResponseException[String, CirceError], PartitionDTO]] = Response( + body, + StatusCode.Ok + ) + val result = source.toRequestResult + assert(result == body) + } + + test("Response.toRequestResult keeps the left value if it's a CirceError") { + val circeError: CirceError = ParsingFailure("Just a test error", new Exception) + val deserializationException = DeserializationException("This is not a json", circeError) + val body = Left(deserializationException) + val source: Response[Either[ResponseException[String, CirceError], PartitionDTO]] = Response( + body, + StatusCode.Ok + ) + val result = source.toRequestResult + assert(result == body) + } + + test("Response.toRequestResult decodes NotFound error") { + val error = NotFoundErrorResponse("This is a test") + val errorResponse = error.asJsonString + val httpError = HttpError(errorResponse, StatusCode.NotFound) + val source: Response[Either[ResponseException[String, CirceError], PartitionDTO]] = Response( + Left(httpError), + StatusCode.Ok + ) + val result = source.toRequestResult + val expected: RequestResult[PartitionDTO] = Left(HttpError(error, httpError.statusCode)) + assert(result == expected) + } + + test("Response.toRequestResult fails to decode InternalServerErrorResponse error") { + val responseBody = "This is not a json" + val httpError = HttpError(responseBody, StatusCode.InternalServerError) + val source: Response[Either[ResponseException[String, CirceError], PartitionDTO]] = Response( + Left(httpError), + StatusCode.Ok + ) + val result = source.toRequestResult + + assert(result.isLeft) + result.swap.foreach { e => + // investigate the error + assert(e.isInstanceOf[DeserializationException[_]]) + val ce = e.asInstanceOf[DeserializationException[ParsingFailure]] + assert(ce.body == responseBody) + } + } + +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/server/ServerConfigUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/server/ServerConfigUnitTests.scala new file mode 100644 index 000000000..cc5c1dd5a --- /dev/null +++ b/reader/src/test/scala/za/co/absa/atum/reader/server/ServerConfigUnitTests.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.server + +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.scalatest.funsuite.AnyFunSuiteLike + +class ServerConfigUnitTests extends AnyFunSuiteLike { + + test("test build from config") { + val server = "https://rivendell.middleearth.jrrt" + val config: Config = ConfigFactory.empty() + .withValue(ServerConfig.HostKey, ConfigValueFactory.fromAnyRef(server)) + val serverConfig = ServerConfig.fromConfig(config) + assert(serverConfig.host == server) + } +}