Skip to content

Commit

Permalink
#245 Add the ability to query REST endpoints from Reader module (#297)
Browse files Browse the repository at this point in the history
* class `Reader` an expect ancestor to all reader, that is able to query the Atum server easily
* class `ServerSetup` to pack Atum server access information
* trait `PartitioningIdProvider` to add ability to easily get Partitioning ID
* `RequestResult[R]` represents an Atum server query response.
* Offered implicits for `MonadError` type class needed for  `Reader` and `ReaderWithPartitioningId` - there are `Future`, Cats `IO` 
* `AtumPartitions` and `AdditionalData` moved from _Agent_ to _Module_
* `ErrorResponse` received a method to decode from Json based on http status code
* README.md update

---------

Co-authored-by: Ladislav Sulak <[email protected]>
  • Loading branch information
benedeki and lsulak authored Dec 4, 2024
1 parent 8603720 commit fec3db4
Show file tree
Hide file tree
Showing 38 changed files with 1,143 additions and 123 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/jacoco_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ jobs:
- name: Get the Coverage info
if: steps.jacocorun.outcome == 'success'
run: |
echo "Total agent module coverage ${{ steps.jacoco-agent.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-agent.outputs.coverage-changed-files }}"
echo "Total agent module coverage ${{ steps.jacoco-reader.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-reader.outputs.coverage-changed-files }}"
echo "Total model module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}"
echo "Total server module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"
echo "Total 'agent' module coverage ${{ steps.jacoco-agent.outputs.coverage-overall }}"
echo "Changed files of 'agent' module coverage ${{ steps.jacoco-agent.outputs.coverage-changed-files }}"
echo "Total 'reader' module coverage ${{ steps.jacoco-reader.outputs.coverage-overall }}"
echo "Changed files of 'reader' module coverage ${{ steps.jacoco-reader.outputs.coverage-changed-files }}"
echo "Total 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}"
echo "Changed files of 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}"
echo "Total 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}"
echo "Changed files of 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"
- name: Fail PR if changed files coverage is less than ${{ env.coverage-changed-files }}%
if: steps.jacocorun.outcome == 'success'
uses: actions/github-script@v6
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/test_filenames_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
excludes: |
server/src/test/scala/za/co/absa/atum/server/api/TestData.scala,
server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala,
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala,
model/src/test/scala/za/co/absa/atum/testing/*
verbose-logging: 'false'
fail-on-violation: 'true'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ We can even say, that `Checkpoint` is a result of particular `Measurements` (ver
The journey of a dataset throughout various data transformations and pipelines. It captures the whole journey,
even if it involves multiple applications or ETL pipelines.


## Usage

### Atum Agent routines
Expand Down Expand Up @@ -247,6 +246,7 @@ Code coverage wil be generated on path:
To make this project runnable via IntelliJ, do the following:
- Make sure that your configuration in `server/src/main/resources/reference.conf`
is configured according to your needs
- When building within an IDE sure to have the option `-language:higherKinds` on in the compiler options, as it's often not picked up from the SBT project settings.

## How to Run Tests

Expand Down
10 changes: 5 additions & 5 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

## Usage

Create multiple `AtumContext` with different control measures to be applied
Create multiple `AtumContext` with different control measures to be applied

### Option 1
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id"))
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1))

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(measuredColumn = "salary"))
```

### Option 2
### Option 2
Use `AtumPartitions` to get an `AtumContext` from the service using the `AtumAgent`.
```scala
val atumContext1 = AtumAgent.createAtumContext(atumPartition)
```

#### AtumPartitions
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
```scala
val atumPartitions = AtumPartitions().withPartitions(ListMap("name" -> "partition-name", "country" -> "SA", "gender" -> "female" ))

Expand Down
11 changes: 6 additions & 5 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package za.co.absa.atum.agent

import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps

/**
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
Expand Down Expand Up @@ -58,7 +59,7 @@ trait AtumAgent {
atumPartitions: AtumPartitions,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
dispatcher.updateAdditionalData(AtumPartitions.toSeqPartitionDTO(atumPartitions), additionalDataPatchDTO)
dispatcher.updateAdditionalData(atumPartitions.toPartitioningDTO, additionalDataPatchDTO)
}

/**
Expand All @@ -75,7 +76,7 @@ trait AtumAgent {
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
val authorIfNew = AtumAgent.currentUser
val partitioningDTO = PartitioningSubmitDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None, authorIfNew)
val partitioningDTO = PartitioningSubmitDTO(atumPartitions.toPartitioningDTO, None, authorIfNew)

val atumContextDTO = dispatcher.createPartitioning(partitioningDTO)
val atumContext = AtumContext.fromDTO(atumContextDTO, this)
Expand All @@ -94,8 +95,8 @@ trait AtumAgent {
val authorIfNew = AtumAgent.currentUser
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions

val newPartitionsDTO = AtumPartitions.toSeqPartitionDTO(newPartitions)
val parentPartitionsDTO = Some(AtumPartitions.toSeqPartitionDTO(parentAtumContext.atumPartitions))
val newPartitionsDTO = newPartitions.toPartitioningDTO
val parentPartitionsDTO = Some(parentAtumContext.atumPartitions.toPartitioningDTO)
val partitioningDTO = PartitioningSubmitDTO(newPartitionsDTO, parentPartitionsDTO, authorIfNew)

val atumContextDTO = dispatcher.createPartitioning(partitioningDTO)
Expand Down
35 changes: 4 additions & 31 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.{AdditionalData, AtumPartitions}
import za.co.absa.atum.agent.exception.AtumAgentException.PartitioningUpdateException
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.types.basic.{AdditionalData, AtumPartitions, AtumPartitionsOps, PartitioningDTOOps}

import java.time.ZonedDateTime
import java.util.UUID
import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
Expand Down Expand Up @@ -91,7 +90,7 @@ class AtumContext private[agent] (
name = checkpointName,
author = agent.currentUser,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions),
partitioning = atumPartitions.toPartitioningDTO,
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurementDTOs
Expand All @@ -115,7 +114,7 @@ class AtumContext private[agent] (
id = UUID.randomUUID(),
name = checkpointName,
author = agent.currentUser,
partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions),
partitioning = atumPartitions.toPartitioningDTO,
processStartTime = dateTimeNow,
processEndTime = Some(dateTimeNow),
measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)
Expand Down Expand Up @@ -206,36 +205,10 @@ class AtumContext private[agent] (
}

object AtumContext {
/**
* Type alias for Atum partitions.
*/
type AtumPartitions = ListMap[String, String]
type AdditionalData = Map[String, Option[String]]

/**
* Object contains helper methods to work with Atum partitions.
*/
object AtumPartitions {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}

def apply(elems: List[(String, String)]): AtumPartitions = {
ListMap(elems:_*)
}

private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = {
atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq
}

private[agent] def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = {
AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList)
}
}

private[agent] def fromDTO(atumContextDTO: AtumContextDTO, agent: AtumAgent): AtumContext = {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
atumContextDTO.partitioning.toAtumPartitions,
agent,
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
atumContextDTO.additionalData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package za.co.absa.atum.agent

import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigValueFactory}
import org.scalatest.funsuite.AnyFunSuiteLike
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.model.types.basic.AtumPartitions

class AtumAgentUnitTests extends AnyFunSuiteLike {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import org.mockito.ArgumentCaptor
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.types.basic._

class AtumContextUnitTests extends AnyFlatSpec with Matchers {

Expand Down Expand Up @@ -95,12 +95,12 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {

val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())

assert(argument.getValue.name == "testCheckpoint")
assert(argument.getValue.author == authorTest)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.measurements.head.result.mainValue.value == "3")
assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
val value: CheckpointDTO = argument.getValue
assert(value.name == "testCheckpoint")
assert(value.author == authorTest)
assert(value.partitioning == atumPartitions.toPartitioningDTO)
assert(value.measurements.head.result.mainValue.value == "3")
assert(value.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
}

"createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in {
Expand All @@ -123,13 +123,14 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {

val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())

assert(argument.getValue.name == "name")
assert(argument.getValue.author == authorTest)
assert(!argument.getValue.measuredByAtumAgent)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get)
assert(argument.getValue.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements))
val value: CheckpointDTO = argument.getValue

assert(value.name == "name")
assert(value.author == authorTest)
assert(!value.measuredByAtumAgent)
assert(value.partitioning == atumPartitions.toPartitioningDTO)
assert(value.processStartTime == value.processEndTime.get)
assert(value.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements))
}

"createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in {
Expand Down Expand Up @@ -167,25 +168,27 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {

val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture())
val valueFirst: CheckpointDTO = argumentFirst.getValue

assert(argumentFirst.getValue.name == "checkPointNameCount")
assert(argumentFirst.getValue.author == authorTest)
assert(argumentFirst.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4")
assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
assert(valueFirst.name == "checkPointNameCount")
assert(valueFirst.author == authorTest)
assert(valueFirst.partitioning == atumPartitions.toPartitioningDTO)
assert(valueFirst.measurements.head.result.mainValue.value == "4")
assert(valueFirst.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)

atumContext.addMeasure(SumOfValuesOfColumn("columnForSum"))
when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user
df.createCheckpoint("checkPointNameSum")

val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture())
val valueSecond: CheckpointDTO = argumentSecond.getValue

assert(argumentSecond.getValue.name == "checkPointNameSum")
assert(argumentSecond.getValue.author == authorTest + "Another")
assert(argumentSecond.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5")
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue)
assert(valueSecond.name == "checkPointNameSum")
assert(valueSecond.author == authorTest + "Another")
assert(valueSecond.partitioning == atumPartitions.toPartitioningDTO)
assert(valueSecond.measurements.tail.head.result.mainValue.value == "22.5")
assert(valueSecond.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue)
}

"addAdditionalData" should "add key/value pair to map for additional data" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.agent.AtumContext.DatasetWrapper
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.spark.commons.test.SparkTestBase

class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package za.co.absa.atum.agent.model
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{AbsSumOfValuesOfColumn, RecordCount, SumOfHashesOfColumn, SumOfValuesOfColumn}
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.atum.agent.AtumContext._
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.types.basic.AtumPartitions

class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@

package za.co.absa.atum.model.envelopes

import io.circe.parser.decode
import io.circe._
import io.circe.generic.semiauto._

import java.util.UUID

object ErrorResponse {
implicit val decodeErrorResponse: Decoder[ErrorResponse] = deriveDecoder
implicit val encodeErrorResponse: Encoder[ErrorResponse] = deriveEncoder
def basedOnStatusCode(statusCode: Int, jsonString: String): Either[Error, ErrorResponse] = {
statusCode match {
case 400 => decode[BadRequestResponse](jsonString)
case 401 => decode[UnauthorizedErrorResponse](jsonString)
case 404 => decode[NotFoundErrorResponse](jsonString)
case 409 => decode[ConflictErrorResponse](jsonString)
case 500 => decode[InternalServerErrorResponse](jsonString)
case _ => decode[GeneralErrorResponse](jsonString)
}
}
}

sealed trait ErrorResponse extends ResponseEnvelope {
def message: String
}
sealed trait ErrorResponse extends ResponseEnvelope

final case class BadRequestResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse

Expand Down Expand Up @@ -71,3 +78,11 @@ object ErrorInDataErrorResponse {
implicit val decoderInternalServerErrorResponse: Decoder[ErrorInDataErrorResponse] = deriveDecoder
implicit val encoderInternalServerErrorResponse: Encoder[ErrorInDataErrorResponse] = deriveEncoder
}

final case class UnauthorizedErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse

object UnauthorizedErrorResponse {
implicit val decoderInternalServerErrorResponse: Decoder[UnauthorizedErrorResponse] = deriveDecoder
implicit val encoderInternalServerErrorResponse: Encoder[UnauthorizedErrorResponse] = deriveEncoder
}

Loading

0 comments on commit fec3db4

Please sign in to comment.