Skip to content

Commit

Permalink
Allow an account specific allow list for some categories
Browse files Browse the repository at this point in the history
  • Loading branch information
mpalriwal-Netflix committed Oct 31, 2024
1 parent a1b999f commit 83d5756
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 10 deletions.
54 changes: 54 additions & 0 deletions atlas-cloudwatch/src/main/resources/ec2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ atlas {
timeout = 20m
end-period-offset = 1

accounts = []

dimensions = [
"InstanceId"
]
Expand Down Expand Up @@ -148,6 +150,58 @@ atlas {
]
}

ec2-asg = {
namespace = "AWS/EC2"
period = 1m
timeout = 20m
end-period-offset = 1

dimensions = [
"AutoScalingGroupName"
]

metrics = [
{
name = "StatusCheckFailed"
alias = "aws.ec2.statusCheckFailed"
conversion = "max"
},
{
name = "StatusCheckFailed_Instance"
alias = "aws.ec2.badInstances"
conversion = "max"
tags = [
{
key = "id"
value = "instance"
}
]
},
{
name = "StatusCheckFailed_System"
alias = "aws.ec2.badInstances"
conversion = "max"
tags = [
{
key = "id"
value = "system"
}
]
},
{
name = "StatusCheckFailed_AttachedEBS"
alias = "aws.ec2.badInstances"
conversion = "max"
tags = [
{
key = "id"
value = "ebs"
}
]
}
]
}

ec2-instance = ${atlas.cloudwatch.ec2} {
dimensions = [
"InstanceId"
Expand Down
1 change: 1 addition & 0 deletions atlas-cloudwatch/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ atlas {
"dynamodb-table-ops",
"dynamodb-capacity",
"dynamodb-replication",
"ec2-asg",
"ec2",
"ec2-api",
"ec2-credit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ abstract class CloudWatchMetricsProcessor(
private[cloudwatch] val pollingDefinedMetric =
registry.createId("atlas.cloudwatch.datapoints.filtered", "reason", "polling")

/** The number of data points filtered out before caching due to defined account allow list . */
private[cloudwatch] val accountBlockedMetric =
registry.createId("atlas.cloudwatch.datapoints.filtered", "reason", "account")

/** The number of data points filtered out before caching due to not having a metric config. */
private[cloudwatch] val filteredMetric =
registry.createId("atlas.cloudwatch.datapoints.filtered", "reason", "metric")
Expand Down Expand Up @@ -205,7 +209,22 @@ abstract class CloudWatchMetricsProcessor(
} else {
if (category.dimensionsMatch(dp.dimensions)) {
matchedDimensions = true
if (category.filter.isDefined && tuple._1.filter.get.matches(toTagMap(dp))) {
if (
category.accounts.isDefined && !category.accountMatch(
dp.dimensions,
category.accounts.get
)
) {
registry
.counter(
accountBlockedMetric
.withTag("aws.namespace", dp.namespace)
.withTag("aws.metric", dp.metricName)
)
.increment()
} else if (
category.filter.isDefined && tuple._1.filter.get.matches(toTagMap(dp))
) {
registry
.counter(filteredQuery.withTag("aws.namespace", dp.namespace))
.increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ import scala.concurrent.duration.DurationInt
* ELB data is reported overall for the load balancer and by zone. For Atlas it
* is better to map in the most granular form and allow the aggregate to be done
* dynamically at query time.
* @param account
* If defined, only provided list of accounts will be allowed for this category,
* default all accounts are allowed.
* @param metrics
* The set of metrics to fetch and metadata for how to convert them.
* @param filter
Expand All @@ -70,6 +73,7 @@ case class MetricCategory(
period: Int,
graceOverride: Int,
dimensions: List[String],
accounts: Option[List[String]] = None,
metrics: List[MetricDefinition],
filter: Option[Query],
pollOffset: Option[Duration] = None
Expand Down Expand Up @@ -124,6 +128,10 @@ case class MetricCategory(
}
!extraTag && matched == dimensions.size
}

def accountMatch(tags: List[Dimension], allowList: List[String]): Boolean = {
tags.exists(d => d.name().equals("nf.account") && allowList.contains(d.value()))
}
}

object MetricCategory extends StrictLogging {
Expand Down Expand Up @@ -156,6 +164,9 @@ object MetricCategory extends StrictLogging {
if (config.hasPath("grace-override")) config.getInt("grace-override") else -1
val pollOffset =
if (config.hasPath("poll-offset")) Some(config.getDuration("poll-offset")) else None
val accounts =
if (config.hasPath("accounts")) Some(config.getStringList("accounts").asScala.toList)
else None

apply(
namespace = config.getString("namespace"),
Expand All @@ -164,6 +175,7 @@ object MetricCategory extends StrictLogging {
dimensions = config.getStringList("dimensions").asScala.toList,
metrics = metrics.flatMap(MetricDefinition.fromConfig),
filter = filter,
accounts = accounts,
pollOffset = pollOffset
)
}
Expand Down
1 change: 1 addition & 0 deletions atlas-cloudwatch/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ atlas {
"ut1",
"ut5",
"ut-ec2",
"ut-node",
"ut-asg",
"ut-timeout",
"ut-offset",
Expand Down
20 changes: 20 additions & 0 deletions atlas-cloudwatch/src/test/resources/test-rules.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ atlas {
]
}

ut-node = {
namespace = "AWS/UT1"
period = 1m

accounts = [
"12345"
]

dimensions = [
"InstanceId"
]

metrics = [
{
name = "Ec2InstanceIdMetric"
alias = "aws.ec2.ec2InstanceIdMetric"
conversion = "sum,rate"
}
]
}
ut-ec2 = {
namespace = "AWS/UT1"
period = 5m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class BaseCloudWatchMetricsProcessorSuite extends FunSuite with TestKitBase with
val config = ConfigFactory.load()
val tagger = new NetflixTagger(config.getConfig("atlas.cloudwatch.tagger"))
val rules: CloudWatchRules = new CloudWatchRules(config)
val category = MetricCategory("AWS/DynamoDB", 60, -1, List("MyTag"), List.empty, null)
val category5m = MetricCategory("AWS/DynamoDB", 300, -1, List("MyTag"), List.empty, null)
val category = MetricCategory("AWS/DynamoDB", 60, -1, List("MyTag"), null, List.empty, null)
val category5m = MetricCategory("AWS/DynamoDB", 300, -1, List("MyTag"), null, List.empty, null)

val cwDP = newCacheEntry(
makeFirehoseMetric(Array(39.0, 1.0, 7.0, 19), ts(-2.minutes)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,62 @@ class CWMPProcessSuite extends BaseCloudWatchMetricsProcessorSuite {
assertCounters(0, publishEmpty = 1, scraped = 1)
}

test("processDatapoints account blocked") {
processor.processDatapoints(
List(
makeFirehoseMetric(
"AWS/UT1",
"Ec2InstanceIdMetric",
List(Dimension.builder().name("InstanceId").value("123").build()),
Array(1.0, 1.0, 1.0, 1),
"Count"
)
),
ts
)
assertPublished(List.empty)
assertCounters(
1,
filtered = Map("namespace" -> (0, "AWS/UT1"), "account" -> (1, "Ec2InstanceIdMetric"))
)
}

test("processDatapoints account allowed") {
processor.processDatapoints(
List(
makeFirehoseMetric(
"AWS/UT1",
"Ec2InstanceIdMetric",
List(
Dimension.builder().name("InstanceId").value("abc").build(),
Dimension.builder().name("nf.account").value("12345").build()
),
Array(1.0, 1.0, 1.0, 1),
"Count"
)
),
ts
)
assertPublished(
List(
com.netflix.atlas.core.model.Datapoint(
Map(
"name" -> "aws.ec2.ec2InstanceIdMetric",
"nf.region" -> "us-west-2",
"nf.app" -> "cloudwatch",
"atlas.dstype" -> "rate",
"nf.cluster" -> "cloudwatch",
"nf.node" -> "abc",
"nf.account" -> "12345"
),
ts,
0.016666666666666666
)
)
)
assertCounters(1)
}

test("processDatapoints No namespace config") {
processor.processDatapoints(
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class CWMPPublishPointSuite extends BaseCloudWatchMetricsProcessorSuite {
val cache = ce(
List(cwv(-5.minutes, -4.minutes, false))
)
val category = MetricCategory("AWS/DynamoDB", 60, 4, List("MyTag"), List.empty, null)
val category = MetricCategory("AWS/DynamoDB", 60, 4, List("MyTag"), null, List.empty, null)
assertPublishPoint(processor.getPublishPoint(cache, nts, category), 0, cache, true)
assertMetrics(
grace = 4.minutes.toMillis,
Expand Down Expand Up @@ -439,6 +439,7 @@ class CWMPPublishPointSuite extends BaseCloudWatchMetricsProcessorSuite {
60,
-1,
List("MyTag"),
null,
List(MetricDefinition("SumRate", "sum.rate", null, true, Map.empty)),
null
)
Expand All @@ -457,6 +458,7 @@ class CWMPPublishPointSuite extends BaseCloudWatchMetricsProcessorSuite {
60,
-1,
List("MyTag"),
null,
List(MetricDefinition("SumRate", "sum.rate", null, true, Map.empty)),
null
)
Expand All @@ -475,6 +477,7 @@ class CWMPPublishPointSuite extends BaseCloudWatchMetricsProcessorSuite {
60,
-1,
List("MyTag"),
null,
List(MetricDefinition("SumRate", "sum.rate", null, true, Map.empty)),
null
)
Expand All @@ -493,6 +496,7 @@ class CWMPPublishPointSuite extends BaseCloudWatchMetricsProcessorSuite {
60,
-1,
List("MyTag"),
null,
List(MetricDefinition("SumRate", "sum.rate", null, true, Map.empty)),
null
)
Expand All @@ -513,6 +517,7 @@ class CWMPPublishPointSuite extends BaseCloudWatchMetricsProcessorSuite {
60,
-1,
List("MyTag"),
null,
List(MetricDefinition("SumRate", "sum.rate", null, true, Map.empty)),
null
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ConversionsSuite extends FunSuite {
test("rate") {
val cnv = Conversions.fromName("sum,rate")
val meta = MetricMetadata(
MetricCategory("NFLX/Test", 300, -1, Nil, Nil, Some(Query.True)),
MetricCategory("NFLX/Test", 300, -1, Nil, null, Nil, Some(Query.True)),
MetricDefinition("test", "test-alias", cnv, false, Map.empty),
Nil
)
Expand All @@ -90,7 +90,7 @@ class ConversionsSuite extends FunSuite {
test("rate already") {
val cnv = Conversions.fromName("sum,rate")
val meta = MetricMetadata(
MetricCategory("NFLX/Test", 300, -1, Nil, Nil, Some(Query.True)),
MetricCategory("NFLX/Test", 300, -1, Nil, null, Nil, Some(Query.True)),
MetricDefinition("test", "test-alias", cnv, false, Map.empty),
Nil
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class MetricDataSuite extends FunSuite {
60,
-1,
List("dimension"),
null,
List(definition),
Some(Query.True)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.time.Instant
class MetricDefinitionSuite extends FunSuite {

private val meta = MetricMetadata(
MetricCategory("AWS/ELB", 60, -1, Nil, Nil, Some(Query.True)),
MetricCategory("AWS/ELB", 60, -1, Nil, null, Nil, Some(Query.True)),
null,
Nil
)
Expand Down Expand Up @@ -248,7 +248,7 @@ class MetricDefinitionSuite extends FunSuite {
.build()

val metadata = MetricMetadata(
MetricCategory("AWS/RDS", 60, -1, Nil, Nil, Some(Query.True)),
MetricCategory("AWS/RDS", 60, -1, Nil, null, Nil, Some(Query.True)),
definition,
Nil
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class RedisClusterCloudWatchMetricsProcessorSuite extends FunSuite with TestKitB
val rules: CloudWatchRules = new CloudWatchRules(config)

val category =
MetricCategory("AWS/UTRedis", 60, -1, List("node", "key"), List.empty, null)
MetricCategory("AWS/UTRedis", 60, -1, List("node", "key"), null, List.empty, null)

val firehoseMetric = makeFirehoseMetric(
"AWS/UTRedis",
Expand Down Expand Up @@ -971,7 +971,7 @@ class RedisClusterCloudWatchMetricsProcessorSuite extends FunSuite with TestKitB

def makePayload(node: String, key: Long): Array[Byte] = {
val category =
MetricCategory("AWS/UTRedis", 60, -1, List("node", "key"), List.empty, null)
MetricCategory("AWS/UTRedis", 60, -1, List("node", "key"), null, List.empty, null)
val cwDP = newCacheEntry(
makeFirehoseMetric(
"AWS/UTRedis",
Expand Down

0 comments on commit 83d5756

Please sign in to comment.