Skip to content

Commit

Permalink
Update tiler cache script and add readme files
Browse files Browse the repository at this point in the history
  • Loading branch information
Rub21 committed Nov 27, 2024
1 parent 21d6883 commit 38676b2
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 141 deletions.
95 changes: 55 additions & 40 deletions images/tiler-cache/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,68 @@

This is a container that includes scripts to perform purge and seed operations. Each script must run on a different instance.

- Tiler seed script
## Seeding Tiles

Tiler seeding is a group of scripts aimed at generating tile cache for a specific zoom level, for example, from 1 to 7. The script will receive a GeoJSON of all the areas where tile cache generation is required for OHM tiles. This approach aims to reduce latency when a user starts interacting with OHM tiles.
This script is designed to minimize latency when users interact with OHM tiles by efficiently generating and seeding tiles across specified zoom levels. Running the entire world dataset may take a significant amount of time to generate the tile cache due to the large volume of data. so that the reson we prioritize certain areas.

The script processes a GeoJSON file containing areas where tile cache generation is required and seeds tiles for OHM, ensuring optimized performance.

- Tiler purge script
Usage

Script that reads an AWS SQS queue and creates a container to purge and seed the tiler cache for specific imposm expired files.
```sh
# The URL of the GeoJSON file specifying the areas where tile seeding is required.
export GEOJSON_URL: https://osmseed-dev.s3.us-east-1.amazonaws.com/tiler/wold-usa-eu.geojson
export ZOOM_LEVELS: '7,8,9,10' # The zoom levels for which tiles need to be seeded.
export CONCURRENCY: 32 # The number of parallel processes to use for generating cache tiles.
export S3_BUCKET: osmseed-dev # The S3 bucket where output statistics (e.g., seeding duration) will be stored.
export OUTPUT_FILE: /logs/tiler_benchmark.log #The path to a CSV file for logging benchmarking results and tracking database performance.

python seed.py
```


**Note**
To run these instances, a service account must be set up for the node that will execute them, as this container needs access to the AWS SQS service to function.
## Purging Tiles

This script processes an AWS SQS queue and launches a container to handle the purging and seeding of the tiler cache for specific imposm expired files. The script efficiently purges cache tiles within zoom levels 8 to 17. Due to the significant time required to purge higher zoom levels (18, 19, and 20), the script includes a separate section to directly delete these tiles from S3. By following specific patterns, this method is far more efficient than using the tiler purge process for zoom levels 18, 19, and 20.


```sh
# Create a ServiceAccount for managing Jobs and associated Pods
apiVersion: v1
kind: ServiceAccount
metadata:
name: job-service-account
namespace: default
---
# Create a ClusterRole with permissions for Jobs and Pods
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: job-manager-role
rules:
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "list", "delete"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["list", "get"]
---
# Bind the ClusterRole to the ServiceAccount
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: job-manager-role-binding
subjects:
- kind: ServiceAccount
name: job-service-account
namespace: default
roleRef:
kind: ClusterRole
name: job-manager-role
apiGroup: rbac.authorization.k8s.io
```
# Environment settings
ENVIRONMENT = "staging" # Environment where the script is executed (e.g., staging or production).
NAMESPACE = "default" # Kubernetes namespace where the tiler cache pods will be triggered.
SQS_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/tiler-imposm3-expired-files" # AWS SQS queue URL for processing expired tiles.
REGION_NAME = "us-east-1" # AWS region where the deployment is hosted.
DOCKER_IMAGE = "ghcr.io/openhistoricalmap/tiler-server:0.0.1-0.dev.git.1780.h62561a8" # Docker image for the tiler server to handle cache purging and seeding.
NODEGROUP_TYPE = "job_large" # Node group label where the cache cleaning pods will be executed.
MAX_ACTIVE_JOBS = 5 # Maximum number of jobs allowed to run in parallel.
DELETE_OLD_JOBS_AGE = 3600 # Time in seconds after which old jobs will be deleted.

# Tiler cache purge and seed settings
EXECUTE_PURGE = "true" # Whether to execute the purge process.
EXECUTE_SEED = "true" # Whether to execute the seed process.

# Zoom level configurations for cache management
PURGE_MIN_ZOOM = 8 # Minimum zoom level for cache purging.
PURGE_MAX_ZOOM = 20 # Maximum zoom level for cache purging.
SEED_MIN_ZOOM = 8 # Minimum zoom level for tile seeding.
SEED_MAX_ZOOM = 14 # Maximum zoom level for tile seeding.

# Concurrency settings
SEED_CONCURRENCY = 16 # Number of parallel processes for seeding tiles.
PURGE_CONCURRENCY = 16 # Number of parallel processes for purging tiles.

# PostgreSQL settings for the tiler database
POSTGRES_HOST = "localhost" # Hostname of the PostgreSQL database.
POSTGRES_PORT = 5432 # Port for the PostgreSQL database.
POSTGRES_DB = "postgres" # Name of the PostgreSQL database.
POSTGRES_USER = "postgres" # Username for the PostgreSQL database.
POSTGRES_PASSWORD = "password" # Password for the PostgreSQL database.

# S3 settings for managing tile data
ZOOM_LEVELS_TO_DELETE = "18,19,20" # Zoom levels for which cache tiles will be deleted directly from S3.
S3_BUCKET_CACHE_TILER = "tiler-cache-staging" # S3 bucket where the tile cache is stored.
S3_BUCKET_PATH_FILES = "mnt/data/osm" # Path within the S3 bucket for tiles to be deleted.

python purge.py

```
41 changes: 40 additions & 1 deletion images/tiler-cache/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime, timezone, timedelta
import logging
from utils import check_tiler_db_postgres_status
from s3_cleanup import compute_children_tiles, generate_tile_patterns, delete_folders_by_pattern

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
Expand All @@ -23,7 +24,7 @@
)
NODEGROUP_TYPE = os.getenv("NODEGROUP_TYPE", "job_large")
MAX_ACTIVE_JOBS = int(os.getenv("MAX_ACTIVE_JOBS", 2))
DELETE_OLD_JOBS_AGE = int(os.getenv("DELETE_OLD_JOBS_AGE", 86400)) # default 1 day
DELETE_OLD_JOBS_AGE = int(os.getenv("DELETE_OLD_JOBS_AGE", 3600)) # default 1 hour

# Tiler cache purge and seed settings
EXECUTE_PURGE = os.getenv("EXECUTE_PURGE", "true")
Expand All @@ -44,6 +45,10 @@
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "password")

ZOOM_LEVELS_TO_DELETE = list(map(int, os.getenv("ZOOM_LEVELS_TO_DELETE", "18,19,20").split(",")))
S3_BUCKET_CACHE_TILER = os.getenv("S3_BUCKET_CACHE_TILER", "tiler-cache-staging")
S3_BUCKET_PATH_FILES = os.getenv("S3_BUCKET_PATH_FILES", "mnt/data/osm")

# Initialize Kubernetes and AWS clients
sqs = boto3.client("sqs", region_name=REGION_NAME)
config.load_incluster_config()
Expand Down Expand Up @@ -126,6 +131,37 @@ def create_kubernetes_job(file_url, file_name):
logging.error(f"Failed to create Kubernetes Job '{job_name}': {e}")



def cleanup_zoom_levels(s3_path, zoom_levels, bucket_name, path_file):
"""
Executes the S3 cleanup process for specific zoom levels.
Args:
s3_path (str): Path to the S3 tiles file.
zoom_levels (list): List of zoom levels to process.
bucket_name (str): Name of the S3 bucket for deletion.
Returns:
None
"""
try:
logging.info(f"Starting cleanup for S3 path: {s3_path}, zoom levels: {zoom_levels}, bucket: {bucket_name}")

# Compute child tiles
tiles = compute_children_tiles(s3_path, zoom_levels)

# Generate patterns for deletion
patterns = generate_tile_patterns(tiles)
logging.info(f"Generated tile patterns for deletion: {patterns}")

# Delete folders based on patterns
delete_folders_by_pattern(bucket_name, patterns, path_file)
logging.info("S3 cleanup completed successfully.")

except Exception as e:
logging.error(f"Error during cleanup: {e}")
raise

def process_sqs_messages():
"""Process messages from the SQS queue and create Kubernetes Jobs for each file."""
while True:
Expand Down Expand Up @@ -174,6 +210,9 @@ def process_sqs_messages():
# Create a Kubernetes job
create_kubernetes_job(file_url, file_name)

# Remove zoom levels 18,19,20
cleanup_zoom_levels(file_url, ZOOM_LEVELS_TO_DELETE, S3_BUCKET_CACHE_TILER, S3_BUCKET_PATH_FILES)

elif "Event" in body and body["Event"] == "s3:TestEvent":
logging.info("Test event detected. Ignoring...")

Expand Down
105 changes: 105 additions & 0 deletions images/tiler-cache/s3_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import boto3
import re
import click
import logging

def compute_children_tiles(s3_path, zoom_levels):
"""
Compute child tiles for the specified zoom levels from a parent tile file in S3.
Args:
s3_path (str): S3 path pointing to the .tiles file.
zoom_levels (list): List of zoom levels for which to compute children.
Returns:
list: A list of child tile paths in "zoom/x/y" format only for the target zoom levels.
"""
logging.info(f"Starting computation of child tiles for {s3_path} and zoom levels {zoom_levels}.")

s3_client = boto3.client("s3")
s3_match = re.match(r"s3://([^/]+)/(.+)", s3_path)
if not s3_match:
raise ValueError(f"Invalid S3 path: {s3_path}")

bucket_name, key = s3_match.groups()
child_tiles = set()

try:
logging.info(f"Fetching file from S3 bucket: {bucket_name}, key: {key}.")
response = s3_client.get_object(Bucket=bucket_name, Key=key)
file_content = response["Body"].read().decode("utf-8")

logging.info(f"Processing tiles in file.")
for line in file_content.splitlines():
tile = line.strip()
match = re.match(r"(\d+)/(\d+)/(\d+)", tile)
if match:
z, x, y = map(int, match.groups())
for target_zoom in zoom_levels:
while z < target_zoom:
x *= 2
y *= 2
z += 1
# Add all 4 children tiles only for the target zoom level
if z == target_zoom:
child_tiles.add(f"{z}/{x}/{y}")
child_tiles.add(f"{z}/{x+1}/{y}")
child_tiles.add(f"{z}/{x}/{y+1}")
child_tiles.add(f"{z}/{x+1}/{y+1}")

except Exception as e:
logging.error(f"Error processing S3 file: {e}")
raise

return list(child_tiles)

def generate_tile_patterns(tiles):
"""
Generate unique tile patterns (zoom/prefix).
Args:
tiles (list): List of tile strings in the format 'zoom/x/y'.
Returns:
list: List of unique patterns in the format 'zoom/prefix'.
"""
patterns = set()
for tile in tiles:
match = re.match(r"(\d+)/(\d+)/(\d+)", tile)
if match:
zoom, x, _ = match.groups()
prefix = f"{zoom}/{str(x)[:2]}"
patterns.add(prefix)
return list(patterns)

def delete_folders_by_pattern(bucket_name, patterns, path_file):
"""
Delete folders in the S3 bucket matching the pattern:
s3://<bucket>/mnt/data/osm/<zoom>/<prefix>
Args:
bucket_name (str): The name of the S3 bucket.
patterns (list): A list of patterns in the format '<zoom>/<prefix>'.
Returns:
None
"""
s3_client = boto3.client("s3")

try:
for pattern in patterns:
zoom, prefix = pattern.split("/")
folder_prefix = f"{path_file}/{zoom}/{prefix}"
logging.info(f"Looking for objects under folder: {folder_prefix}")
paginator = s3_client.get_paginator("list_objects_v2")
response_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_prefix)

for page in response_iterator:
for obj in page.get("Contents", []):
key = obj["Key"]
logging.info(f"Deleting object: {key}")
s3_client.delete_object(Bucket=bucket_name, Key=key)
logging.info("Deletion completed for all matching patterns.")
except Exception as e:
logging.error(f"Error deleting folders: {e}")
raise
61 changes: 23 additions & 38 deletions images/tiler-cache/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,23 @@
level=logging.INFO,
)

# Fetch environment variables
GEOJSON_URL = os.getenv("GEOJSON_URL", None)
ZOOM_LEVELS = os.getenv("ZOOM_LEVELS", "6,7")
CONCURRENCY = int(os.getenv("CONCURRENCY", 32))
S3_BUCKET = os.getenv("S3_BUCKET", "osmseed-dev")
OUTPUT_FILE = os.getenv("OUTPUT_FILE", "log_file.csv")

@click.command(short_help="Script to request or seed tiles from a Tiler API.")
@click.option(
"--geojson-url",
required=True,
help="URL to the GeoJSON file defining the area of interest.",
)
@click.option(
"--zoom-levels",
help="Comma-separated list of zoom levels",
default="6,7",
)
@click.option(
"--concurrency",
help="Number of concurrent processes for seeding",
default=32,
type=int,
)
@click.option(
"--s3-bucket",
help="S3 bucket to upload the result CSV file",
default="osmseed-dev",
)
@click.option(
"--log-file",
help="CSV file to save the logs results",
default="log_file.csv",
)
def main(geojson_url, zoom_levels, concurrency, log_file, s3_bucket):
def main():
"""
Main function to process and seed tiles
"""

if not GEOJSON_URL:
logging.error("Environment variable GEOJSON_URL is required but not set. Exiting.")
return

logging.info("Starting the tile seeding process.")

# Check PostgreSQL status
Expand All @@ -57,37 +42,37 @@ def main(geojson_url, zoom_levels, concurrency, log_file, s3_bucket):
logging.info("PostgreSQL database is running and reachable.")

# Extract base name from the GeoJSON URL
parsed_url = urlparse(geojson_url)
parsed_url = urlparse(GEOJSON_URL)
base_name = os.path.splitext(os.path.basename(parsed_url.path))[0]
logging.info(f"Base name extracted from GeoJSON URL: {base_name}")

# Parse zoom levels
zoom_levels = list(map(int, zoom_levels.split(",")))
zoom_levels = list(map(int, ZOOM_LEVELS.split(",")))
min_zoom = min(zoom_levels)
max_zoom = max(zoom_levels)
logging.info(f"Zoom levels parsed: Min Zoom: {min_zoom}, Max Zoom: {max_zoom}")

features, tiles = process_geojson_to_feature_tiles(geojson_url, min_zoom)
# Process GeoJSON and compute tiles
features, tiles = process_geojson_to_feature_tiles(GEOJSON_URL, min_zoom)
geojson_file = f"{base_name}_tiles.geojson"
save_geojson_boundary(features, geojson_file)

# Use base name for skipped tiles and log files
skipped_tiles_file = f"{base_name}_skipped_tiles.tiles"
log_file = f"{base_name}_seeding_log.csv"
OUTPUT_FILE = f"{base_name}_seeding_log.csv"

# Seed the tiles
logging.info("Starting the seeding process...")
seed_tiles(tiles, concurrency, min_zoom, max_zoom, log_file, skipped_tiles_file)
seed_tiles(tiles, CONCURRENCY, min_zoom, max_zoom, OUTPUT_FILE, skipped_tiles_file)
logging.info("Tile seeding complete.")
logging.info(f"Skipped tiles saved to: {skipped_tiles_file}")
logging.info(f"Log of seeding performance saved to: {log_file}")
logging.info(f"Log of seeding performance saved to: {OUTPUT_FILE}")

# Upload log files to S3
upload_to_s3(log_file, s3_bucket, f"tiler/logs/{log_file}")
upload_to_s3(skipped_tiles_file, s3_bucket, f"tiler/logs/{skipped_tiles_file}")
upload_to_s3(skipped_tiles_file, s3_bucket, f"tiler/logs/{geojson_file}")
upload_to_s3(OUTPUT_FILE, S3_BUCKET, f"tiler/logs/{OUTPUT_FILE}")
upload_to_s3(skipped_tiles_file, S3_BUCKET, f"tiler/logs/{skipped_tiles_file}")
upload_to_s3(geojson_file, S3_BUCKET, f"tiler/logs/{geojson_file}")
logging.info("Log files uploaded to S3.")


if __name__ == "__main__":
main()
main()
Loading

0 comments on commit 38676b2

Please sign in to comment.