Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#245 Add the ability to query REST endpoints from Reader module #297

Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b97b603
#244: Create the Info module
benedeki Aug 19, 2024
e623974
* fixed License headers
benedeki Aug 19, 2024
5e4eadb
* renamed to _Reader_
benedeki Aug 24, 2024
2e1e2ea
* README.md update
benedeki Aug 25, 2024
738c904
* fix
benedeki Aug 25, 2024
df8c9bd
* JaCoCO action update
benedeki Aug 26, 2024
5affd82
* added dummy code for testing coverage
benedeki Aug 26, 2024
0f1e121
* erroneous class renamed
benedeki Aug 26, 2024
d773a93
* Deleted wrong files
benedeki Aug 26, 2024
0776f9c
#245 Add the ability to query REST endpoints from Reader module
benedeki Sep 10, 2024
38fde1c
* Work still in progress
benedeki Sep 23, 2024
1ac2233
* the first working commit
benedeki Nov 1, 2024
e6dcb52
* Removed temporary notes
benedeki Nov 1, 2024
6968b02
* introduced `MonadError` into the `GenericServerConnection`
benedeki Nov 1, 2024
b9bacef
* Fixed UTs
benedeki Nov 1, 2024
bbb1e7f
* trying to get rid of Java 11 dependency
benedeki Nov 4, 2024
33e6628
* Downgraded sttpClient
benedeki Nov 4, 2024
f7ced56
* further downgrade
benedeki Nov 5, 2024
ca2116b
* Removed exceptions
benedeki Nov 6, 2024
e5e6f63
* commented out parts of README.md which are not yet part of the code
benedeki Nov 6, 2024
fe07272
- major rework
benedeki Nov 17, 2024
7656f6f
* doc fix
benedeki Nov 17, 2024
eb9a678
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 17, 2024
7641c07
* disabled failing test
benedeki Nov 17, 2024
bc82a5b
* adjustments
benedeki Nov 18, 2024
0e7675e
- further cleaning
benedeki Nov 18, 2024
432716a
* tests progress
benedeki Nov 21, 2024
11b0a16
* several UTs added
benedeki Nov 22, 2024
2c3f145
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 22, 2024
e07dffb
* last improvements before PR ready
benedeki Nov 24, 2024
3955a50
* description to class `ServerConfig`
benedeki Nov 25, 2024
b287a66
* removed empty line
benedeki Nov 25, 2024
d04d23b
* addressed PR comments
benedeki Nov 27, 2024
c344249
* just better implementation
benedeki Nov 30, 2024
a4759d1
Apply suggestions from code review
benedeki Dec 4, 2024
5ca4bd5
* small fixes
benedeki Dec 4, 2024
d5054b6
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Dec 4, 2024
f8c1d86
Apply suggestions from code review
benedeki Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/jacoco_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ jobs:
- name: Get the Coverage info
if: steps.jacocorun.outcome == 'success'
run: |
echo "Total agent module coverage ${{ steps.jacoco-agent.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-agent.outputs.coverage-changed-files }}"
echo "Total agent module coverage ${{ steps.jacoco-reader.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-reader.outputs.coverage-changed-files }}"
echo "Total model module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}"
echo "Total server module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}"
echo "Changed Files coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"
echo "Total 'agent' module coverage ${{ steps.jacoco-agent.outputs.coverage-overall }}"
echo "Changed files of 'agent' module coverage ${{ steps.jacoco-agent.outputs.coverage-changed-files }}"
echo "Total 'reader' module coverage ${{ steps.jacoco-reader.outputs.coverage-overall }}"
echo "Changed files of 'reader' module coverage ${{ steps.jacoco-reader.outputs.coverage-changed-files }}"
echo "Total 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-overall }}"
echo "Changed files of 'model' module coverage ${{ steps.jacoco-model.outputs.coverage-changed-files }}"
echo "Total 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-overall }}"
echo "Changed files of 'server' module coverage ${{ steps.jacoco-server.outputs.coverage-changed-files }}"
- name: Fail PR if changed files coverage is less than ${{ env.coverage-changed-files }}%
if: steps.jacocorun.outcome == 'success'
uses: actions/github-script@v6
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/test_filenames_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
excludes: |
server/src/test/scala/za/co/absa/atum/server/api/TestData.scala,
server/src/test/scala/za/co/absa/atum/server/api/TestTransactorProvider.scala,
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala
server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala,
model/src/test/scala/za/co/absa/atum/testing/*
verbose-logging: 'false'
fail-on-violation: 'true'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ We can even say, that `Checkpoint` is a result of particular `Measurements` (ver
The journey of a dataset throughout various data transformations and pipelines. It captures the whole journey,
even if it involves multiple applications or ETL pipelines.


## Usage

### Atum Agent routines
Expand Down Expand Up @@ -247,6 +246,7 @@ Code coverage wil be generated on path:
To make this project runnable via IntelliJ, do the following:
- Make sure that your configuration in `server/src/main/resources/reference.conf`
is configured according to your needs
- When building within an IDE sure to have the option `-language:higherKinds` on in the compiler options, as it's often not picked up from the SBT project settings.

## How to Run Tests

Expand Down
10 changes: 5 additions & 5 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

## Usage

Create multiple `AtumContext` with different control measures to be applied
Create multiple `AtumContext` with different control measures to be applied

### Option 1
```scala
val atumContextInstanceWithRecordCount = AtumContext(processor = processor)
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredColumn = "id"))
.withMeasureAdded(RecordCount(MockMeasureNames.recordCount1))
salamonpavel marked this conversation as resolved.
Show resolved Hide resolved

val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount
.withMeasureAdded(AbsSumOfValuesOfColumn(measuredColumn = "salary"))
```

### Option 2
### Option 2
Use `AtumPartitions` to get an `AtumContext` from the service using the `AtumAgent`.
```scala
val atumContext1 = AtumAgent.createAtumContext(atumPartition)
```

#### AtumPartitions
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
A list of key values that maintains the order of arrival of the items, the `AtumService`
is able to deliver the correct `AtumContext` according to the `AtumPartitions` we give it.
```scala
val atumPartitions = AtumPartitions().withPartitions(ListMap("name" -> "partition-name", "country" -> "SA", "gender" -> "female" ))

Expand Down
11 changes: 6 additions & 5 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package za.co.absa.atum.agent

import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps

/**
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
Expand Down Expand Up @@ -58,7 +59,7 @@ trait AtumAgent {
atumPartitions: AtumPartitions,
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
dispatcher.updateAdditionalData(AtumPartitions.toSeqPartitionDTO(atumPartitions), additionalDataPatchDTO)
dispatcher.updateAdditionalData(atumPartitions.toPartitioningDTO, additionalDataPatchDTO)
}

/**
Expand All @@ -75,7 +76,7 @@ trait AtumAgent {
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
val authorIfNew = AtumAgent.currentUser
val partitioningDTO = PartitioningSubmitDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None, authorIfNew)
val partitioningDTO = PartitioningSubmitDTO(atumPartitions.toPartitioningDTO, None, authorIfNew)

val atumContextDTO = dispatcher.createPartitioning(partitioningDTO)
val atumContext = AtumContext.fromDTO(atumContextDTO, this)
Expand All @@ -94,8 +95,8 @@ trait AtumAgent {
val authorIfNew = AtumAgent.currentUser
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions

val newPartitionsDTO = AtumPartitions.toSeqPartitionDTO(newPartitions)
val parentPartitionsDTO = Some(AtumPartitions.toSeqPartitionDTO(parentAtumContext.atumPartitions))
val newPartitionsDTO = newPartitions.toPartitioningDTO
val parentPartitionsDTO = Some(parentAtumContext.atumPartitions.toPartitioningDTO)
val partitioningDTO = PartitioningSubmitDTO(newPartitionsDTO, parentPartitionsDTO, authorIfNew)

val atumContextDTO = dispatcher.createPartitioning(partitioningDTO)
Expand Down
35 changes: 4 additions & 31 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.{AdditionalData, AtumPartitions}
import za.co.absa.atum.agent.exception.AtumAgentException.PartitioningUpdateException
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.types.basic.{AdditionalData, AtumPartitions, AtumPartitionsOps, PartitioningDTOOps}

import java.time.ZonedDateTime
import java.util.UUID
import scala.collection.immutable.ListMap

/**
* This class provides the methods to measure Spark `Dataframe`. Also allows to add and remove measures.
Expand Down Expand Up @@ -91,7 +90,7 @@ class AtumContext private[agent] (
name = checkpointName,
author = agent.currentUser,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions),
partitioning = atumPartitions.toPartitioningDTO,
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurementDTOs
Expand All @@ -115,7 +114,7 @@ class AtumContext private[agent] (
id = UUID.randomUUID(),
name = checkpointName,
author = agent.currentUser,
partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions),
partitioning = atumPartitions.toPartitioningDTO,
processStartTime = dateTimeNow,
processEndTime = Some(dateTimeNow),
measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)
Expand Down Expand Up @@ -206,36 +205,10 @@ class AtumContext private[agent] (
}

object AtumContext {
/**
* Type alias for Atum partitions.
*/
type AtumPartitions = ListMap[String, String]
type AdditionalData = Map[String, Option[String]]

/**
* Object contains helper methods to work with Atum partitions.
*/
object AtumPartitions {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}

def apply(elems: List[(String, String)]): AtumPartitions = {
ListMap(elems:_*)
}

private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = {
atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq
}

private[agent] def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = {
AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList)
}
}

private[agent] def fromDTO(atumContextDTO: AtumContextDTO, agent: AtumAgent): AtumContext = {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
atumContextDTO.partitioning.toAtumPartitions,
agent,
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
atumContextDTO.additionalData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package za.co.absa.atum.agent

import com.typesafe.config.{Config, ConfigException, ConfigFactory, ConfigValueFactory}
import org.scalatest.funsuite.AnyFunSuiteLike
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.model.types.basic.AtumPartitions

class AtumAgentUnitTests extends AnyFunSuiteLike {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import org.mockito.ArgumentCaptor
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.types.basic._

class AtumContextUnitTests extends AnyFlatSpec with Matchers {

Expand Down Expand Up @@ -95,12 +95,12 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {

val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())

assert(argument.getValue.name == "testCheckpoint")
assert(argument.getValue.author == authorTest)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.measurements.head.result.mainValue.value == "3")
assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
val value: CheckpointDTO = argument.getValue
assert(value.name == "testCheckpoint")
assert(value.author == authorTest)
assert(value.partitioning == atumPartitions.toPartitioningDTO)
assert(value.measurements.head.result.mainValue.value == "3")
assert(value.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
}

"createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in {
Expand All @@ -123,13 +123,14 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {

val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())

assert(argument.getValue.name == "name")
assert(argument.getValue.author == authorTest)
assert(!argument.getValue.measuredByAtumAgent)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get)
assert(argument.getValue.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements))
val value: CheckpointDTO = argument.getValue

assert(value.name == "name")
assert(value.author == authorTest)
assert(!value.measuredByAtumAgent)
assert(value.partitioning == atumPartitions.toPartitioningDTO)
assert(value.processStartTime == value.processEndTime.get)
assert(value.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements))
}

"createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in {
Expand Down Expand Up @@ -167,25 +168,27 @@ class AtumContextUnitTests extends AnyFlatSpec with Matchers {

val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture())
val valueFirst: CheckpointDTO = argumentFirst.getValue

assert(argumentFirst.getValue.name == "checkPointNameCount")
assert(argumentFirst.getValue.author == authorTest)
assert(argumentFirst.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4")
assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)
assert(valueFirst.name == "checkPointNameCount")
assert(valueFirst.author == authorTest)
assert(valueFirst.partitioning == atumPartitions.toPartitioningDTO)
assert(valueFirst.measurements.head.result.mainValue.value == "4")
assert(valueFirst.measurements.head.result.mainValue.valueType == ResultValueType.LongValue)

atumContext.addMeasure(SumOfValuesOfColumn("columnForSum"))
when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user
df.createCheckpoint("checkPointNameSum")

val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture())
val valueSecond: CheckpointDTO = argumentSecond.getValue

assert(argumentSecond.getValue.name == "checkPointNameSum")
assert(argumentSecond.getValue.author == authorTest + "Another")
assert(argumentSecond.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5")
assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue)
assert(valueSecond.name == "checkPointNameSum")
assert(valueSecond.author == authorTest + "Another")
assert(valueSecond.partitioning == atumPartitions.toPartitioningDTO)
assert(valueSecond.measurements.tail.head.result.mainValue.value == "22.5")
assert(valueSecond.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimalValue)
}

"addAdditionalData" should "add key/value pair to map for additional data" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.agent.AtumContext.DatasetWrapper
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.spark.commons.test.SparkTestBase

class AtumMeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package za.co.absa.atum.agent.model
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumAgent
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{AbsSumOfValuesOfColumn, RecordCount, SumOfHashesOfColumn, SumOfValuesOfColumn}
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.atum.agent.AtumContext._
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.types.basic.AtumPartitions

class MeasureUnitTests extends AnyFlatSpec with Matchers with SparkTestBase { self =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@

package za.co.absa.atum.model.envelopes

import io.circe.parser.decode
import io.circe._
import io.circe.generic.semiauto._

import java.util.UUID

object ErrorResponse {
implicit val decodeErrorResponse: Decoder[ErrorResponse] = deriveDecoder
implicit val encodeErrorResponse: Encoder[ErrorResponse] = deriveEncoder
def basedOnStatusCode(statusCode: Int, jsonString: String): Either[Error, ErrorResponse] = {
statusCode match {
case 400 => decode[BadRequestResponse](jsonString)
case 401 => decode[UnauthorizedErrorResponse](jsonString)
case 404 => decode[NotFoundErrorResponse](jsonString)
case 409 => decode[ConflictErrorResponse](jsonString)
case 500 => decode[InternalServerErrorResponse](jsonString)
case _ => decode[GeneralErrorResponse](jsonString)
}
}
}

sealed trait ErrorResponse extends ResponseEnvelope {
def message: String
}
sealed trait ErrorResponse extends ResponseEnvelope

final case class BadRequestResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse

Expand Down Expand Up @@ -71,3 +78,11 @@ object ErrorInDataErrorResponse {
implicit val decoderInternalServerErrorResponse: Decoder[ErrorInDataErrorResponse] = deriveDecoder
implicit val encoderInternalServerErrorResponse: Encoder[ErrorInDataErrorResponse] = deriveEncoder
}

final case class UnauthorizedErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse

object UnauthorizedErrorResponse {
implicit val decoderInternalServerErrorResponse: Decoder[UnauthorizedErrorResponse] = deriveDecoder
implicit val encoderInternalServerErrorResponse: Encoder[UnauthorizedErrorResponse] = deriveEncoder
}

Loading
Loading