From 60be81cbb819ea4c5530cd8af21f9a75f9905ed3 Mon Sep 17 00:00:00 2001 From: AnalyticJeremy Date: Mon, 22 May 2023 13:48:08 +0000 Subject: [PATCH] initial build --- Acquire Data.py | 635 ++++++++++++++++++++++++++++++++++++++++++++++++ Analyze Data.py | 338 ++++++++++++++++++++++++++ README.md | 31 ++- 3 files changed, 1000 insertions(+), 4 deletions(-) create mode 100644 Acquire Data.py create mode 100644 Analyze Data.py diff --git a/Acquire Data.py b/Acquire Data.py new file mode 100644 index 0000000..a48661c --- /dev/null +++ b/Acquire Data.py @@ -0,0 +1,635 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Phase 1: Acquire Data +# MAGIC +# MAGIC This notebook will get data about your Azure Databricks VM usage. It looks at the last 12 days of the +# MAGIC [Azure Activity Logs](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/activity-log) for the +# MAGIC Databricks managed resource groups to determine what VMs are created and how long they are used. +# MAGIC +# MAGIC Through this process, we will build a tiny little lakehouse to store the Databricks VM usage data in its +# MAGIC various stages of processing. The Azure Activity Log data is saved in its raw form as JSON data. +# MAGIC This notebook then uses Spark to transform that raw data into Delta files in a Bronze, Silver, and Gold +# MAGIC layer. Each of these steps further transforms the data and prepares it for analysis. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC The Azure Activity Log can be accessed through the Azure Management API. Rather than making raw REST calls, we'll use the +# MAGIC [Azure Libraries for Python](https://learn.microsoft.com/en-us/azure/developer/python/sdk/azure-sdk-overview). We can use the `%pip` +# MAGIC magic command to install them in our current Python session. + +# COMMAND ---------- + +# MAGIC %pip install azure-identity azure-mgmt-monitor azure-mgmt-resourcegraph azure-mgmt-resource azure-mgmt-compute + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Configuration +# MAGIC +# MAGIC These are the configuration settings you will need to adjust to specify how the data acquisition process should operate. +# MAGIC +# MAGIC **`service_principal`** contains the information about the service principal that you will use to connect to the Azure Management API. +# MAGIC If you used the `az ad sp create-for-rbac` CLI command to create the service principal, then you can just straight copy and paste the +# MAGIC output of that command here. +# MAGIC +# MAGIC **`subscription_ids`** is a Python list of the subscription ID's for which you want to gather Databricks VM usage data. If you leave +# MAGIC the list empty, then this process will pull Databricks VM usage data for all subscriptions to which the service principal has access. +# MAGIC +# MAGIC **`delta_location`** is the path where the Databricks VM usage data will be saved. It can be a location on the DBFS or it can use +# MAGIC any standard HDFS path (*e.g.* an `abfss://` path) +# MAGIC +# MAGIC **`days_to_pull`** specifies how many days into the past the Azure Activity Log should be queried. By default, the Azure Activity Log +# MAGIC retains 90 days of data. Specifying a lower number will speed up the process because there will be fewer queries made. However, +# MAGIC you will want to pull several days of data to get a clearer picture of the usage patterns. +# MAGIC +# MAGIC **`query_window_hours`** controls how many hours will be covered by each query to the Azure Activity Log. Instead of issuing one, big +# MAGIC query to the Activity Log to pull all of the history at once, this notebook will break it up into multiple queries. If you don't have +# MAGIC a lot of Databricks VM activity, you can set this number higher. For example, setting it to "24" will issue one query per day. If +# MAGIC you have a high volume of Databricks VM activity, set this number lower. This will break the querying process into smaller, more +# MAGIC reliable chunks. If you're not sure, just leave it at 4. + +# COMMAND ---------- + +service_principal = { + "appId": "", + "displayName": "", + "password": "", + "tenant": "" +} + +subscription_ids = [] + +delta_location = "dbfs:/vm-observation" + +days_to_pull = 12 +query_window_hours = 4 + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Raw Data +# MAGIC +# MAGIC Read the Azure Activity Logs and save the results as JSON files. + +# COMMAND ---------- + +import azure.mgmt.resourcegraph as arg +from datetime import datetime, timedelta +from azure.identity import ClientSecretCredential, AzureCliCredential +from azure.mgmt.resource import SubscriptionClient +from azure.mgmt.monitor import MonitorManagementClient + + +credential = ClientSecretCredential(service_principal["tenant"], service_principal["appId"], service_principal["password"]) +#credential = AzureCliCredential() # (Optional... if you want to use Azure CLI to login instead of using a service principal) + +while delta_location.endswith("/"): + delta_location = delta_location[:-1] + +# If we were not given a list of subscriptions, just get all of the ones the SP has access to +if not subscription_ids: + subscriptions_client = SubscriptionClient(credential) + subscription_ids = [s.as_dict()['subscription_id'] for s in subscriptions_client.subscriptions.list()] + + +def get_adbx_workspaces(): + arg_client = arg.ResourceGraphClient(credential) + query_options = arg.models.QueryRequestOptions(result_format = "objectArray") + + query_string = 'resources | where type == "microsoft.databricks/workspaces" | project workspaceId = id, name, subscriptionId, sku = sku.name, managedResourceGroupId = properties.managedResourceGroupId, location' + query_request = arg.models.QueryRequest(subscriptions=subscription_ids, query=query_string, options=query_options) + results = arg_client.resources(query_request) + + output = results.as_dict()["data"] + for o in output: + o["managedResourceGroupName"] = o["managedResourceGroupId"].split('/')[-1] + + monitor_client = MonitorManagementClient(credential, o["subscriptionId"]) + diagnostic_settings = monitor_client.diagnostic_settings.list(o["workspaceId"]) + o["diagnostic_settings"] = [s.as_dict() for s in diagnostic_settings] + + return output + + +def get_activity_log_for_workspace(workspace, date): + print(f"{datetime.now()}\n - Getting activity log for workspace \"{workspace['name']}\" {date.date()} {date.hour}:00") + monitor_client = MonitorManagementClient(credential, workspace["subscriptionId"]) + + start_time = date - timedelta(hours=query_window_hours) + timedelta(microseconds=1) + end_time = date + filter = f"eventTimestamp ge '{start_time.isoformat()}' and eventTimestamp le '{end_time.isoformat()}' and resourceGroupName eq '{workspace['managedResourceGroupName']}' and resourceType eq 'Microsoft.Compute/virtualMachines'" + + log_entries = monitor_client.activity_logs.list(filter) + return [i.as_dict() for i in log_entries] + +# COMMAND ---------- + +import json + +workspaces = get_adbx_workspaces() +dbutils.fs.put(f"{delta_location}/raw/workspaces/workspaces.json", json.dumps({"workspaces": workspaces, "capture_time": datetime.now().isoformat() + "Z"})) + +ws_dict = {ws['managedResourceGroupName']: ws for ws in workspaces} +timestamp = datetime.now() + +i = 0 +for rg_name in ws_dict: + ws = ws_dict[rg_name] + i = i + 1 + + print(f"\n*************************\nProcessing workspace {ws['name']} ({i} of {len(ws_dict)})") + json_path = f"{delta_location}/raw/activity_logs/subscription={ws['subscriptionId']}/rg={rg_name}" + + for h in range(0, 24 * days_to_pull, query_window_hours): + date = timestamp - timedelta(hours=h) + logs = get_activity_log_for_workspace(ws, date) + + print(f" - Parsing response body JSON") + for log in logs: + if "properties" in log: + if "responseBody" in log["properties"]: + response_body = json.loads(log["properties"]["responseBody"]) + log["properties"]["responseBody"] = response_body + + json_filename = f"{json_path}/{date.date()}-{date.hour}.json" + print(f" - Saving log to file: {json_filename}") + dbutils.fs.put(json_filename, json.dumps({"workspace": ws, "logs": logs, "capture_time": datetime.now().isoformat() + "Z"})) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Get VM SKU Information +# MAGIC In addition to the VM usage logs, we also need to get some information about the VM SKU's. We will pull it from the Azure API just to be sure +# MAGIC we always have the latest data. The available VM types can vary by subscription and by region so we will have to query with multiple sets of +# MAGIC parameters. + +# COMMAND ---------- + +from azure.mgmt.compute import ComputeManagementClient + +sub_locs = [(ws['subscriptionId'], ws['location']) for ws in workspaces] +sub_locs = list(set(sub_locs)) + +for sub_loc in sub_locs: + print(f"Getting VM SKUs for subscription {sub_loc[0]} in {sub_loc[1]} region") + compute_client = ComputeManagementClient(credential, sub_loc[0]) + results = compute_client.virtual_machine_sizes.list(sub_loc[1]) + results = [i.as_dict() for i in results] + + for r in results: + r["subscription_id"] = sub_loc[0] + r["location"] = sub_loc[1] + + json_path = f"{delta_location}/raw/vm_skus/subscription={sub_loc[0]}" + json_filename = f"{json_path}/{sub_loc[1]}.json" + dbutils.fs.put(json_filename, json.dumps(results)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Bronze Layer +# MAGIC +# MAGIC This is the first step in transforming our raw data. We will simply read the JSON files and apply only the bare minimum of transformations +# MAGIC before converting it to Delta files. + +# COMMAND ---------- + +import pyspark.sql.functions as F + +spark.conf.set("spark.sql.caseSensitive", "true") + +df = spark.read \ + .format("json") \ + .load(delta_location + "/raw/activity_logs") \ + .selectExpr("workspace.workspaceId", "workspace.name AS workspaceName", "workspace.sku AS workspaceSku", "workspace.subscriptionId", + "workspace.managedResourceGroupId", "CAST(capture_time AS TIMESTAMP) AS capture_time", "explode(logs) AS log_entry") \ + .selectExpr("CAST(log_entry.event_timestamp AS TIMESTAMP) AS event_timestamp", "lower(split(log_entry.resource_id, '/')[8]) as vm_id", "*") \ + .cache() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC There are some columns that we definitely won't need for analysis and they have some challenging column names. So we will make life +# MAGIC easier for us and just drop those columns. + +# COMMAND ---------- + +claim_cols = df.select("log_entry.claims.*").columns + +for col in claim_cols: + if col.startswith("http") and ":/" in col: + df = df.withColumn("log_entry", F.col("log_entry").dropFields("claims.`" + col + "`")) + +df = df.withColumn("log_entry", F.col("log_entry").dropFields("properties.responseBody.identity")) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Each of the Azure tags applied to a Databricks VM is represented as its own field in a nested column. However, Delta does not allow the names of two +# MAGIC columns to vary only by case. For example, Delta won't let you have a column called `project_name` and `Project_Name` in the same table. Across a large +# MAGIC organization, it's easy for Azure tags to have inconsistent casing in their names. Therefore, as we convert the raw data to Delta, we have to find +# MAGIC Azure tag columns that have the same name but different letter casing and combine those into one column. + +# COMMAND ---------- + +tagnames = df.select("log_entry.properties.responseBody.tags.*").columns + +instance_pool_tagname = "DatabricksInstancePoolId" +if instance_pool_tagname not in tagnames: + print("Instance Pool ID column not in JSON schema. Adding it to Delta file.") + df = df.withColumn("log_entry", F.col("log_entry").withField("properties.responseBody.tags." + instance_pool_tagname, F.lit(None).astype("string"))) + + +tagname_groups = {} +for tagname in tagnames: + tagname_groups.setdefault(tagname.lower(), []).append(tagname) + +for key in tagname_groups: + values = tagname_groups[key] + if len(values) > 1: + keep = values[0] + for i in range(1, len(values)): + print(f"combine tag \"{values[i]}\" into tag \"{keep}\"") + df = df.withColumn("log_entry", + F.col("log_entry").withField("properties.responseBody.tags." + keep, + F.coalesce( + F.col("log_entry.properties.responseBody.tags." + keep), + F.col("log_entry.properties.responseBody.tags." + values[i]) + ) + ) + ) + df = df.withColumn("log_entry", F.col("log_entry").dropFields("properties.responseBody.tags." + values[i])) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Delta has some restrictions on what characters can be used in column names. Most notabaly, this includes spaces. We will look through +# MAGIC all of the Azure tag columns and change any invalid characters to underscores. + +# COMMAND ---------- + +tagnames = df.select("log_entry.properties.responseBody.tags.*").columns +bad_chars = " ,;{}()\n\t=" + +for tagname in tagnames: + for c in bad_chars: + if c in tagname: + new_tagname = tagname.replace(c, "_") + print(f"Change tag \"{tagname}\" to \"{new_tagname}") + + if new_tagname in tagnames: + df = df.withColumn("log_entry", + F.col("log_entry").withField("properties.responseBody.tags." + new_tagname, + F.coalesce( + F.col("log_entry.properties.responseBody.tags." + new_tagname), + F.col("log_entry.properties.responseBody.tags." + tagname) + ) + ) + ) + else: + df = df.withColumn("log_entry", + F.col("log_entry").withField( + "properties.responseBody.tags." + new_tagname, + F.col("log_entry.properties.responseBody.tags." + tagname) + ) + ) + + df = df.withColumn("log_entry", F.col("log_entry").dropFields("properties.responseBody.tags.`" + tagname + "`")) + +# COMMAND ---------- + +df.write.format("delta").mode("overwrite").save(delta_location + "/bronze/activity_logs") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### VM SKU Information +# MAGIC We also need to convert our VM SKU data from JSON to Delta! This is much simpler, though, because the data is not complex. + +# COMMAND ---------- + +df = spark.read \ + .format("json") \ + .load(delta_location + "/raw/vm_skus") + +df.write.format("delta").mode("overwrite").save(delta_location + "/bronze/vm_skus") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Silver Layer +# MAGIC +# MAGIC Now that we have our data in the easy-to-use Delta format, we can begin transforming it into a format that is more useful for analysis. +# MAGIC The raw data contains a log of all of the VM operations that Azure performed in the Databricks-managed resource groups. However many +# MAGIC of those operations aren't useful for our analysis. We only want to know when a new VM was created or an existing VM was deleted. +# MAGIC (Though we won't use it for this analysis, we'll also record when spot instances are evicted because that could be interesting to +# MAGIC study.) + +# COMMAND ---------- + +import pyspark.sql.functions as F + +df = spark.read.format("delta").load(delta_location + "/bronze/activity_logs") + +# Get rows for the creation, eviction, or deletion of a VM +activities = df \ + .filter(""" + (log_entry.properties.responseBody IS NOT NULL AND log_entry.resource_type.value != 'Microsoft.Compute/virtualMachines/extensions') + OR lower(log_entry.operation_name.value) LIKE '%spot%' + OR (log_entry.operation_name.value == 'Microsoft.Compute/virtualMachines/delete' AND log_entry.event_name.value == 'EndRequest') + """) \ + .selectExpr("vm_id", "event_timestamp", "log_entry.properties.statusCode", "log_entry.operation_name.value AS operation", "subscriptionId", + "workspaceName", "log_entry.properties.responseBody.properties.hardwareProfile.vmSize", "log_entry.properties.responseBody.properties.priority", + "log_entry.properties.responseBody.location", "log_entry.properties.responseBody.tags") \ + .orderBy("vm_id", "event_timestamp") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC For our analysis, we need the Azure tags associated with the VM. This tells us which job, cluster, or pool the VM was part of. However, we only +# MAGIC get the Azure tag data when a VM is created. If a VM was started before the period of time we are studying, we won't have that tag data in our logs. +# MAGIC Therefore, we will just throw out any data for VM's that are missing this critical data. + +# COMMAND ---------- + +# Remove all information about VMs if we don't have any tags for it +activities.createOrReplaceTempView("activities_table") + +no_starts = spark.sql("SELECT vm_id, COUNT(*), MIN(event_timestamp), MAX(event_timestamp) FROM activities_table WHERE vm_id NOT IN (SELECT vm_id FROM activities_table WHERE tags IS NOT NULL) GROUP BY vm_id ORDER BY 3 DESC") +print(f"\nRemoving data for {no_starts.count():,} VMs because there is no creation data in the logs.\n") +no_starts.show(truncate=False) + +activities = spark.sql("SELECT * FROM activities_table WHERE vm_id IN (SELECT vm_id FROM activities_table WHERE tags IS NOT NULL)") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Since not all data points (like Azure tag data) is present in every log entry, we will order the logs by VM ID and timestamp. We will carry forward +# MAGIC these data points to all future rows that are missing the data. +# MAGIC +# MAGIC That is to say, if we have sparse data like this... +# MAGIC +# MAGIC | VM ID | Timestamp | VM Size | +# MAGIC |-------|-----------|----------| +# MAGIC | 1234 | 13:38:34 | E8ds_v4 | +# MAGIC | 1234 | 13:42:18 | *null* | +# MAGIC | 1234 | 13:46:12 | *null* | +# MAGIC | 1234 | 13:47:09 | *null* | +# MAGIC | 1234 | 13:49:58 | E16ds_v4 | +# MAGIC | 1234 | 13:50:02 | *null* | +# MAGIC | 5678 | 12:17:24 | DS3_v3 | +# MAGIC | 5678 | 12:22:32 | *null* | +# MAGIC | 5678 | 12:34:02 | *null* | +# MAGIC +# MAGIC ... we will carry forward the values in the sparse column so it looks like this: +# MAGIC +# MAGIC | VM ID | Timestamp | VM Size | +# MAGIC |-------|-----------|----------| +# MAGIC | 1234 | 13:38:34 | E8ds_v4 | +# MAGIC | 1234 | 13:42:18 | E8ds_v4 | +# MAGIC | 1234 | 13:46:12 | E8ds_v4 | +# MAGIC | 1234 | 13:47:09 | E8ds_v4 | +# MAGIC | 1234 | 13:49:58 | E16ds_v4 | +# MAGIC | 1234 | 13:50:02 | E16ds_v4 | +# MAGIC | 5678 | 12:17:24 | DS3_v3 | +# MAGIC | 5678 | 12:22:32 | DS3_v3 | +# MAGIC | 5678 | 12:34:02 | DS3_v3 | + +# COMMAND ---------- + +from pyspark.sql.window import Window + +window_spec = Window.partitionBy("vm_id").orderBy("event_timestamp") + +activities = activities \ + .withColumn("activity_number", F.row_number().over(window_spec)) \ + .withColumn("event_end", F.lead("event_timestamp").over(window_spec) - F.expr('INTERVAL 1 MICROSECOND')) \ + .withColumn("vmSize", F.last("vmSize", True).over(window_spec)) \ + .withColumn("priority", F.last("priority", True).over(window_spec)) \ + .withColumn("location", F.last("location", True).over(window_spec)) \ + .withColumn("tags", F.last("tags", True).over(window_spec)) + +print(f"Rows: {activities.count():,}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Silver processing complete! Write out this version of the data to its own Delta table. + +# COMMAND ---------- + +activities.write.format("delta").mode("overwrite").save(delta_location + "/silver/activities") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### VM SKU Information +# MAGIC This will be the final step in processing the VM SKU data. We will interpret the SKU name to get additional information about the VM series. +# MAGIC +# MAGIC For information on how the VM SKU's are named, see this document: [Azure virtual machine sizes naming conventions](https://learn.microsoft.com/en-us/azure/virtual-machines/vm-naming-conventions) + +# COMMAND ---------- + +df = spark.read.format("delta").load(delta_location + "/bronze/vm_skus") + +df = df \ + .drop("subscription") \ + .withColumn("memory_in_gb", F.expr("memory_in_mb / 1024")) \ + .withColumn("name_without_version", F.expr("REGEXP_REPLACE(name, '\_v[0-9]+', '')")) \ + .withColumn("name_parts", F.split("name", "_")) \ + .withColumn("is_promo", F.expr("name LIKE '%_Promo'")) \ + .withColumn("family_group", F.expr("name_parts[0]")) \ + .withColumn("family", F.expr("REGEXP_EXTRACT(name_parts[1], '^[A-z]+', 0)")) \ + .withColumn("name_number", F.expr("REGEXP_EXTRACT(name_parts[1], '^([A-z]+)([0-9]+)', 2)")) \ + .withColumn("constrained_vcpu", F.expr("REGEXP_EXTRACT(name_parts[1], '^([A-z]+)([0-9]+)-([0-9]+)', 3)")) \ + .withColumn("additive_features", F.expr("REGEXP_EXTRACT(name_parts[1], '^([A-z]+)([0-9]+)((-[0-9]+)?)(.*)', 5)")) \ + .withColumn("accelerator_type", F.expr("CASE WHEN name_parts[2] != 'Promo' AND name_parts[2] NOT RLIKE 'v[0-9]+' THEN name_parts[2] ELSE NULL END")) \ + .withColumn("version_number", F.expr("CASE WHEN name_parts[2] RLIKE 'v[0-9]+' THEN name_parts[2] ELSE CASE WHEN name_parts[3] RLIKE 'v[0-9]+' THEN name_parts[3] ELSE 'v1' END END")) \ + .drop("name_parts") + +df = df \ + .withColumn("is_amd", F.expr("additive_features RLIKE 'a'")) \ + .withColumn("is_block_storage_performance", F.expr("additive_features RLIKE 'b'")) \ + .withColumn("has_local_temp_disk", F.expr("additive_features RLIKE 'd'")) \ + .withColumn("is_isolated", F.expr("additive_features RLIKE 'i'")) \ + .withColumn("is_low_memory", F.expr("additive_features RLIKE 'l'")) \ + .withColumn("is_memory_intensive", F.expr("additive_features RLIKE 'm'")) \ + .withColumn("is_arm", F.expr("additive_features RLIKE 'p'")) \ + .withColumn("is_tiny_memory", F.expr("additive_features RLIKE 't'")) \ + .withColumn("has_premium_storage", F.expr("additive_features RLIKE 's'")) \ + .withColumn("is_confidential", F.expr("additive_features RLIKE 'C'")) \ + .withColumn("is_node_packing", F.expr("additive_features RLIKE 'NP'")) + +display(df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC The Azure API does not provide information about which series a VM type belongs to. So we will copy some data from the docs and use that instead. +# MAGIC This may have to be manually updated from time to time. + +# COMMAND ---------- + +# Copied from: https://learn.microsoft.com/en-us/azure/virtual-machines/sizes + +series_data = [ + ["General purpose", + "B, Dsv3, Dv3, Dasv4, Dav4, DSv2, Dv2, Av2, DC, DCv2, Dpdsv5, Dpldsv5, Dpsv5, Dplsv5, Dv4, Dsv4, Ddv4, Ddsv4, Dv5, Dsv5, Ddv5, Ddsv5, Dasv5, Dadsv5", + "Balanced CPU-to-memory ratio. Ideal for testing and development, small to medium databases, and low to medium traffic web servers." + ], + ["Compute optimized", + "F, Fs, Fsv2, FX", + "High CPU-to-memory ratio. Good for medium traffic web servers, network appliances, batch processes, and application servers." + ], + ["Memory optimized", + "Esv3, Ev3, Easv4, Eav4, Epdsv5, Epsv5, Ev4, Esv4, Edv4, Edsv4, Ev5, Esv5, Edv5, Edsv5, Easv5, Eadsv5, Mv2, M, DSv2, Dv2", + "High memory-to-CPU ratio. Great for relational database servers, medium to large caches, and in-memory analytics." + ], + ["Storage optimized", + "Lsv2, Lsv3, Lasv3", + "High disk throughput and IO ideal for Big Data, SQL, NoSQL databases, data warehousing and large transactional databases." + ], + ["GPU", + "NC, NCv2, NCv3, NCasT4_v3, ND, NDv2, NV, NVv3, NVv4, NDasrA100_v4, NDm_A100_v4", + "Specialized virtual machines targeted for heavy graphic rendering and video editing, as well as model training and inferencing (ND) with deep learning. Available with single or multiple GPUs." + ], + ["High performance compute", + "HB, HBv2, HBv3, HBv4, HC, HX", + "Our fastest and most powerful CPU virtual machines with optional high-throughput network interfaces (RDMA)." + ] +] + +series_data = [{'series': i[0], 'type': i[1].split(','), 'description': i[2]} for i in series_data] +series_data = spark.createDataFrame(series_data) + +series_data = series_data \ + .withColumn("type", F.explode("type")) \ + .withColumn("type", F.expr("TRIM(type)")) \ + .withColumn("family", F.expr("REGEXP_EXTRACT(type, '^[A-Z]+', 0)")) \ + .withColumn("version_number", F.expr("REGEXP_EXTRACT(type, 'v[0-9]+$', 0)")) \ + .withColumn("version_number", F.expr("CASE WHEN version_number == '' THEN 'v1' ELSE version_number END")) + +series_data = series_data \ + .groupBy("family", "version_number") \ + .agg(F.min("series").alias("series")) \ + .join(series_data.groupBy("series").agg(F.max("description").alias("description")), on="series", how="inner") + +df = df.join(series_data, on=["family", "version_number"], how="left_outer") + +df.write.format("delta").mode("overwrite").save(delta_location + "/silver/vm_skus") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Gold Layer +# MAGIC +# MAGIC For this last step in the data preparation, we want to explode the dataset rows so that there is an individual record for every minute that a VM was running. + +# COMMAND ---------- + +activities = spark.read.format("delta").load(delta_location + "/silver/activities") + +# COMMAND ---------- + +from datetime import datetime, timedelta +import pyspark.sql.functions as F + +dates = activities \ + .agg(F.min("event_timestamp").alias("min"), F.max("event_timestamp").alias("max")) \ + .selectExpr("*", "(TO_UNIX_TIMESTAMP(max) - TO_UNIX_TIMESTAMP(min)) / 60 AS minutes") \ + .collect() + +start_date = dates[0][0] +start_date = datetime(start_date.year, start_date.month, start_date.day, start_date.hour, start_date.minute) +minutes = dates[0][2] + +observations = spark.range(minutes) \ + .withColumn("date", F.lit(start_date) + F.expr("MAKE_INTERVAL(0, 0, 0, 0, 0, id)")) + + +observations.alias("o") \ + .join(activities.filter("operation NOT LIKE '%delete'").alias("a"), + observations["date"].between(activities["event_timestamp"], activities["event_end"])) \ + .withColumn("JobName", F.expr("REGEXP_REPLACE(REGEXP_REPLACE(a.tags.RunName, '^ADF\_', ''), '\_[a-f0-9\-]{36}$', '')")) \ + .groupBy("o.date", "a.subscriptionId", "a.workspaceName", "a.vmSize", "a.priority", "a.location", "a.tags.ClusterName", "a.tags.JobId", + "JobName", "a.tags.RunName", "a.tags.DatabricksInstancePoolId") \ + .count() \ + .orderBy("o.date") \ + .write \ + .format("delta") \ + .save(delta_location + "/gold/observations") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Data Checks +# MAGIC +# MAGIC Now that we've built our datasets, let's check them to make sure they conform to our expectations. + +# COMMAND ---------- + +df = spark.read.format("delta").load(delta_location + "/bronze/activity_logs") +activities = spark.read.format("delta").load(delta_location + "/silver/activities") +observations = spark.read.format("delta").load(delta_location + "/gold/observations") + +print(f"Bronze Rows: {df.count():,}") +print(f"Silver Rows: {activities.count():,}") +print(f"Gold Rows: {observations.count():,}") + +# COMMAND ---------- + +print(df.filter("event_timestamp IS NULL").count()) +print(df.filter("vm_id IS NULL").count()) +print(df.filter("lower(vm_id) NOT rlike '^[0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f]$'").count()) + +# COMMAND ---------- + +display(df.groupBy("log_entry.properties.statusCode").count()) + +# COMMAND ---------- + +display(df.groupBy("log_entry.operation_name.value", "log_entry.properties.statusCode").count().orderBy("value", "statusCode")) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC With Azure, there is some time between when a `DELETE` operation request is received and when it completes. This will help us see the distribution +# MAGIC of minutes it takes to delete a VM. + +# COMMAND ---------- + +import pyspark.sql.functions as F + +display(df.where("log_entry.operation_name.value == 'Microsoft.Compute/virtualMachines/delete'").groupBy("vm_id", "log_entry.operation_name.value").count().orderBy(F.col("count").desc())) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC This is a quick check to see if there are any Databricks Instance Pools in use. + +# COMMAND ---------- + +display( + df + .filter("log_entry.properties.responseBody.tags.DatabricksInstancePoolId IS NOT NULL") + .groupBy("log_entry.properties.responseBody.tags.DatabricksInstancePoolId") + .agg(F.count("vm_id"), F.min("event_timestamp"), F.max("event_timestamp")) + ) + +# COMMAND ---------- + +display(activities.groupBy("tags.ClusterName").count()) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Here we'll take a quick peek at the information we came here for... how many VM's of each type are in use at any given time. + +# COMMAND ---------- + +display(observations.groupBy("vmSize", "date").agg(F.sum("count").alias("count")).orderBy("date")) + +# COMMAND ---------- + +display(observations.groupBy("JobName").agg(F.sum("count").alias("vms")).orderBy(F.col("vms").desc())) diff --git a/Analyze Data.py b/Analyze Data.py new file mode 100644 index 0000000..3fdaf14 --- /dev/null +++ b/Analyze Data.py @@ -0,0 +1,338 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Phase 2: Analyze Data +# MAGIC +# MAGIC Now that we have data on Databricks VM usage, we can analyze that data to understand how many VM's are being used and when they are being used. +# MAGIC With this data, we can see how we might optimize our compute configuration. We can also estimate how big our VM instance pools should be to +# MAGIC statisfy the compute requirements of our jobs. + +# COMMAND ---------- + +import pyspark.sql.functions as F + +delta_location = "dbfs:/vm-observation" + +observations = spark.read.format("delta").load(delta_location + "/gold/observations") + +# COMMAND ---------- + +# DBTITLE 1,Workspaces With Most Usage +display(observations.groupBy("subscriptionId", "workspaceName").agg(F.sum("count").alias("total_vm_minutes")).orderBy(F.col("total_vm_minutes").desc())) + +# COMMAND ---------- + +# DBTITLE 1,Databricks Jobs With Most Usage +display(observations.groupBy("subscriptionId", "workspaceName", "JobName").agg(F.sum("count").alias("total_vm_minutes")).orderBy(F.col("total_vm_minutes").desc())) + +# COMMAND ---------- + +# DBTITLE 1,VM Usage by VM SKU +display(observations.groupBy("subscriptionId", "workspaceName", "vmSize").agg(F.sum("count").alias("total_vm_minutes")).orderBy(F.col("total_vm_minutes").desc())) + +# COMMAND ---------- + +# DBTITLE 1,Usage Summary Report +summary = observations \ + .groupBy("subscriptionId", "workspaceName", "vmSize", "JobName") \ + .agg(F.sum("count").alias("count"), F.min("date").alias("start"), F.max("date").alias("end"), F.max("count").alias("count_max"), F.min("count").alias("count_min"), F.countDistinct("ClusterName").alias("cluster_count")) \ + .withColumn("minutes", F.expr("((unix_timestamp(end) - unix_timestamp(start)) / 60) + 1")) \ + .withColumn("vms_per_min", F.expr("count / minutes")) \ + .orderBy("subscriptionId", "workspaceName", "vmSize", F.col("count").desc()) + +data = summary.collect() + +subscriptionId = "" +workspaceName = "" +vmSize = "" + +htmlCode = """ + + + +
+ + +""" + +for row in data: + if row[0] != subscriptionId: + subscriptionId = row[0] + htmlCode += f"" + + if row[1] != workspaceName: + workspaceName = row[1] + htmlCode += f"" + + if row[2] != vmSize: + vmSize = row[2] + htmlCode += f"" + + jobName = row[3] if row[3] else "[non-job cluster]" + run_count = format(row[9], ",") if row[3] else "-" + htmlCode += f"" + +htmlCode += "
Job NameTotal VM
Minutes
Concurrent
VMs - Max
Concurrent
VMs - Min
Total
Minutes
VMs per
Minute
Number
of Runs
Subscription: {subscriptionId}
Workspace: {workspaceName}
VM SKU: {vmSize}
{jobName}{row[4]:,}{row[7]:,}{row[8]:,}{int(row[10]):,}{row[11]:.2f}{run_count}
" + +displayHTML(htmlCode) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Analyzing Usage for Frequently Occuring Jobs +# MAGIC Analysis above looks at all VM usage in Databricks, including VM's that aren't part of on-demand job clusters. This is useful data +# MAGIC to understand Databricks compute consumption across an entire enterprise. However, to make progress, we need to re-focus on our +# MAGIC core purpose here: We want to increase the resiliency of our regularly occuring Databricks jobs. We will do this by identifying +# MAGIC jobs that are currently powered by on-demand job clusters and move those to Instance Pools. Therefore, we do not need to be concerned +# MAGIC about interactive clusters or jobs that are already running with Instance Pools. We also don't need to worry about jobs that run +# MAGIC sporadically. +# MAGIC +# MAGIC To that end, the rest of the analysis in this notebook will focus only on "frequent jobs". These are jobs that appear often in the +# MAGIC logs during the period of time for which we gathered usage data. + +# COMMAND ---------- + +dates = observations.agg(F.min("date"), F.max("date")).collect()[0] +range_start = dates[0] +range_end = dates[1] +range_minutes = (range_end - range_start).total_seconds() / 60 + +frequent_jobs = observations \ + .groupBy("subscriptionId", "location", "workspaceName", "vmSize", "JobName") \ + .agg(F.min("date").alias("start"), F.max("date").alias("end")) \ + .withColumn("minutes", F.expr("((unix_timestamp(end) - unix_timestamp(start)) / 60) + 1")) \ + .withColumn("percentage", F.col("minutes") / F.lit(range_minutes)) \ + .filter("JobName IS NOT NULL AND vmSize IS NOT NULL AND percentage > 0.75") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### VM SKU Variety +# MAGIC Each Instance Pool in Databricks can only contain one type of VM. If we are using many different types of VM's, then we will have to +# MAGIC create many instance pools. That can create extra management overhead. It can also reduce the efficiency of our pools because spare +# MAGIC VM's from one instance pool cannot be used to fill demand in another pool. +# MAGIC +# MAGIC Therefore, to make our instance pools more efficient, we should consider reducing the number of SKU's that we use as much as possible. +# MAGIC In some cases, you may have done very careful testing to select the exact VM SKU that optimizes your workload. But in other cases +# MAGIC (and it seems to be the most common case), a data engineer may have just blindly selected a VM type without putting any thought into it. +# MAGIC For these jobs, we might consider changing the VM type and consolidating many jobs into just a few VM types. +# MAGIC +# MAGIC In the report, a red exclamation mark (❗) is used to indicate an older version of a VM type if you are also using a newer version of +# MAGIC that same type. The report also highlights usage of the "Standard_DS3_v2" VM type. There's nothing wrong with this VM type (it's great, actually!), +# MAGIC but it's the default option when creating a compute cluster in Azure Databricks. It is highlighted simply to illustrate how often your data +# MAGIC engineers simply accept the default instead of carefully selecting a VM type. + +# COMMAND ---------- + +# DBTITLE 1,VM SKU Variety +from pyspark.sql.window import Window + +vm_skus = spark.read.format("delta").load(delta_location + "/silver/vm_skus") + +sku_usage = observations \ + .join(frequent_jobs, on=["subscriptionId", "location", "workspaceName", "vmSize", "JobName"], how="inner") \ + .groupBy("subscriptionId", "location", "workspaceName", "vmSize") \ + .agg(F.countDistinct("JobName").alias("job_count"), F.sum("count").alias("count")) + +order_by_cols = ["subscriptionId", "workspaceName", "series", "family", "number_of_cores", "vmSize"] +window_spec = Window.partitionBy("subscriptionId", "workspaceName", "series").orderBy(order_by_cols) + +sku_usage = sku_usage \ + .join(vm_skus, (sku_usage.subscriptionId == vm_skus.subscription_id) & (sku_usage.location == vm_skus.location) & (sku_usage.vmSize == vm_skus.name), how="left_outer") \ + .withColumn("next_name_without_version", F.lead("name_without_version").over(window_spec)) \ + .orderBy(order_by_cols) \ + .selectExpr("subscriptionId", "workspaceName", "COALESCE(series, 'Unknown') AS series", "family", "vmSize", "next_name_without_version", "name_without_version", "description", "count", "job_count", "number_of_cores", "memory_in_gb", "has_premium_storage", "is_amd", "has_local_temp_disk") + +data = sku_usage.collect() + +subscriptionId = "" +workspaceName = "" +series = "" + +htmlCode = """ + + + +
+ + +""" + +for row in data: + row_dict = row.asDict() + if row[0] != subscriptionId: + subscriptionId = row[0] + htmlCode += f"" + + if row[1] != workspaceName: + workspaceName = row[1] + htmlCode += f"" + + if row[2] != series: + series = row[2] + htmlCode += f"" + + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + htmlCode += f"" + +htmlCode += "
VM SKUTotal VM
Minutes
Job
Count
vCPUsMemoryPremium
Storage
AMD
processor
Local
Temp Disk
Subscription: {subscriptionId}
Workspace: {workspaceName}
" + htmlCode += f"{series} series
" + htmlCode += f"{row_dict['description']}
{row_dict['vmSize']}{' ❗' if row_dict['name_without_version'] == row_dict['next_name_without_version'] else ''}{row_dict['count']:,}{row_dict['job_count']:,}{row_dict['number_of_cores']:,}{int(row_dict['memory_in_gb'])}{'✅' if row_dict['has_premium_storage'] else ''}{'✅' if row_dict['is_amd'] else ''}{'✅' if row_dict['has_local_temp_disk'] else ''}
" + +displayHTML(htmlCode) + +# COMMAND ---------- + + + +# COMMAND ---------- + + + +# COMMAND ---------- + +from pyspark.sql.window import Window + +order_by_cols = ["subscriptionId", "location", "workspaceName", "vmSize", "JobName", "start_date"] +window_spec = Window.partitionBy("subscriptionId", "location", "workspaceName", "vmSize", "JobName").orderBy(order_by_cols) + +job_runs = observations \ + .filter("JobName IS NOT NULL AND vmSize IS NOT NULL") \ + .groupBy("subscriptionId", "location", "workspaceName", "vmSize", "JobName", "JobId", "ClusterName", "RunName") \ + .agg(F.sum("count").alias("total_count"), F.avg("count").alias("vms_per_minute"), F.min("date").alias("start_date"), F.max("date").alias("end_date")) \ + .withColumn("run_minutes", F.expr("(UNIX_TIMESTAMP(end_date) - UNIX_TIMESTAMP(start_date)) / 60")) \ + .withColumn("previous_start_date", F.lag("start_date").over(window_spec)) \ + .withColumn("hours_between_starts", F.expr("(UNIX_TIMESTAMP(start_date) - UNIX_TIMESTAMP(previous_start_date)) / 3600.0")) \ + .orderBy(order_by_cols) + +display(job_runs) + +# COMMAND ---------- + + + +# COMMAND ---------- + + + +# COMMAND ---------- + + + +# COMMAND ---------- + +vm_counts = observations \ + .filter("JobName IS NOT NULL") \ + .groupBy("subscriptionId", "workspaceName", "vmSize", "JobName", "date") \ + .agg(F.sum("count").alias("count")) + +all_dates = spark.range(0, range_minutes).withColumn("date", F.lit(range_start) + F.expr("MAKE_INTERVAL(0, 0, 0, 0, 0, id)")).drop("id").orderBy("date") + +df = frequent_jobs.select("subscriptionId", "workspaceName", "vmSize", "JobName") \ + .crossJoin(all_dates) + +df = df \ + .join(vm_counts.alias("v"), ["subscriptionId", "workspaceName", "vmSize", "JobName", "date"], "leftouter") \ + .withColumn("count", F.coalesce(F.col("count"), F.lit(0))) \ + .groupBy("subscriptionId", "workspaceName", "vmSize", "jobName", "date") \ + .agg(F.sum("count").alias("count")) \ + .withColumn("job_active", F.expr("CASE WHEN count > 0 THEN jobName ELSE NULL END")) \ + .cache() + +display( df.groupBy("subscriptionId", "workspaceName", "vmSize").agg(F.min("count"), F.max("count").alias("max_count")) ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Inspection by VM SKU + +# COMMAND ---------- + +subscription_id = "ee691273-18af-4600-bc24-eb6768bf9cfa" +workspace_name = "dde-prod-dbks-w1" +vm_size = "Standard_DS4_v2" + +# COMMAND ---------- + +# DBTITLE 1,VM Usage Aggregated by Minute +subset = df.filter(f"subscriptionId == '{subscription_id}' AND workspaceName == '{workspace_name}' AND vmSize == '{vm_size}'") +subset = subset.withColumn("minute", F.minute("date")) +subset = subset.withColumn("job_active", F.expr("CASE WHEN count > 0 THEN jobName ELSE NULL END")) +subset = subset.groupBy("subscriptionId", "workspaceName", "vmSize", "minute").agg(F.sum("count").alias("count"), F.countDistinct("job_active").alias("job_count")) +display(subset) + +# COMMAND ---------- + +# DBTITLE 1,Daily Usage Visualization +from datetime import timedelta +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib.dates as mdates + +date_range = observations.agg(F.min("date"), F.max("date")).collect()[0] +first_full_day = date_range[0].date() + timedelta(days = 1) +last_full_day = date_range[1].date() - timedelta(days = 1) +days_count = (last_full_day - first_full_day).days + 1 +days = [first_full_day + timedelta(days=i) for i in range(0, days_count)] + +daily = df \ + .withColumn("day", F.to_date("date")) \ + .withColumn("time", F.expr("date - MAKE_INTERVAL(0, 0, 0, DATEDIFF(date, '2000-01-01'))")) \ + .filter(f"subscriptionId == '{subscription_id}' AND workspaceName == '{workspace_name}' AND vmSize == '{vm_size}'") \ + .filter(f"day BETWEEN '{first_full_day}' AND '{last_full_day}'") \ + .groupBy("subscriptionId", "workspaceName", "vmSize", "date", "day", "time") \ + .agg(F.sum("count").alias("count")) \ + .cache() + +max_count = daily.agg(F.max("count")).collect()[0][0] +fig, ax = plt.subplots(days_count, 1, figsize=(24, (4 * days_count) + 2)) +hour_locator = mdates.HourLocator(interval=1) +hour_formatter = mdates.DateFormatter("%H") + +for i, day in enumerate(days): + data = daily.filter(f"day = '{day}'").orderBy("date").toPandas() + X = data["time"] + Y = data["count"] + + ax[i].plot(X, Y, color='black') + ax[i].fill_between(X, Y, 0, color='tomato') + + ax[i].xaxis.set_major_locator(hour_locator) + ax[i].xaxis.set_major_formatter(hour_formatter) + ax[i].set_ylim([0, max_count + 1]) + ax[i].set_xlim(pd.Timestamp('2000-01-01'), pd.Timestamp('2000-01-01 23:59:59')) + ax[i].set_ylabel(day, fontsize=20) + +fig.tight_layout() +fig.suptitle(f"Daily VM Usage Workspace: {workspace_name} VM SKU: {vm_size}", fontsize=30) +fig.subplots_adjust(top=0.97) + +# COMMAND ---------- + + diff --git a/README.md b/README.md index d9eb66c..a25a6e6 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Azure Databricks customers should be using [instance pools](https://learn.microsoft.com/en-us/azure/databricks/clusters/pool-best-practices) for their production workloads. These instance pools will help your jobs run faster (because you don't have to wait for -VM's to spin up) and will make your workload more resilient (because you won't get "Cloud Provisioning" errors). +VM's to spin up) and will make your workload more resilient (because it reduces your exposure to "Cloud Provisioning" errors). One common challenge to creating instance pools is knowing how large to make them. Customers may have multiple production workspaces, each with numerous jobs running at a variety of intervals. Determining the right size for the pools can require complex analysis. @@ -16,10 +16,33 @@ This tool has two phases: 1. **Data Analysis** - analyze the VM usage patterns to determine the most efficient size for your pools ## Setup -TODO: How to set up this accelerator +To run this solution, the code will need to connect to your Azure subscription. The best way to do this is to create a new +service principal that has read access to the subscriptions you would like to analyze. The easiest way to create this +service principal is with a CLI command like this: +``` +$subscriptionId = "" +az ad sp create-for-rbac --name "Databricks VM Usage Analyzer" --role reader --scopes /subscriptions/$subscriptionId +``` + +This command will return the results in JSON format. You can copy those results right into the first notebook, and you +should be ready to go! ## Phase 1: Data Acquisition -TODO: Running the first notebook +Run the "Acquire Data" notebook. This will query the Azure Activity Log and get VM operation information from the +Databricks-managed resource groups. The data will be saved as raw JSON files. The notebook will then transform +this data in various phases and create a tiny little Delta lake with bronze, silver, and gold layers. ## Phase 2: Data Analysis -TODO: How to run the second notebook +The second notebook, "Analyze Data," will read the data acquired in the previous phase. It will summarize VM usage +by job and by VM SKU. You can use this data to better understand your VM usage patterns and determine how to optimize +your Databricks Instance Pool size. + +## Frequently Asked Questions (FAQ) + +**Why does your analysis only suggest one size for an instance pool? Why don't you suggest settings for minimum idle, maximum capacity, +and idle instance auto-timeout that would allow the pool size to vary?** + +For this analysis, we are primarialy concerned with creating pools for resiliency purposes. The idea is to create VM's and then hang +on to them as long as possible so that in the event of an incident with the Azure VM service, we will already have the VM's that we +need. So the question we are trying to answer is: How many VM's should I keep on-hand to be able to run my jobs in the event of +an Azure incident? \ No newline at end of file