Skip to content

Commit

Permalink
Merge phash_main into main branch (#204)
Browse files Browse the repository at this point in the history
* Implementing hash bucketing v2 (#178)

* Implementing hash bucket v2

* Fix the assertion regarding hash buckets

* Python 3.7 does not have doClassCleanups in super

* Fix the memory issue with the hb index creation

* Compaction session implementation for algo v2 (#187)

* Compaction session implementation for algo v2

* Address comments

* Added capability to measure instance minutes in a autoscaling cluster setting

* Avoid configuring logger if ray is uninitialized

* Resolve merge conflict and rebase from main

* Adding additional optimization from POC (#194)

* Adding additional optimization from POC

* fix typo

* Moved the compact_partition tests to top level module

* Adding unit tests for parquet downloaders

* fix typo

* fix repartition session

* Adding stack trace and passing config kwargs separate due to s3fs bug

* fix the parquet reader

* pass deltacat_storage_kwargs in repartition_session

* addressed comments and extend tests to handle v2

* Add merge support and unit tests (#193)

* Add merge support and unit tests

* Add merge support and unit tests

* fix drop_duplicates

* fix merge and ensure all v1 tests are passing

* fix the naming

* Refactor drop_duplicates to into module

* fix the hash group indices range

* Copy empty hash bucket support; Fix for empty hash bucket in old compacted table

* refactor and naming changes

* Add case when no primary keys

* Add capability to avoid dropping duplicates for rebase

* Support DELETE deltas including unit tests

* only create a delta type column when delete bundle

* fix all issues during actual run

* fix incremental compaction num_rows None

* remove db_test.sqlite

* optimize appending the parquet files

* address comments

* address comments

* address comments

---------

Co-authored-by: Raghavendra Dani <[email protected]>

* Merge phash_main branch into main

* Bumping up deltacat version

---------

Co-authored-by: Zyiqin-Miranda <[email protected]>
  • Loading branch information
raghumdani and Zyiqin-Miranda authored Aug 31, 2023
1 parent 3a1dadf commit 9ba2aba
Show file tree
Hide file tree
Showing 33 changed files with 1,296 additions and 158 deletions.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "0.1.18b15"
__version__ = "0.1.18b16"


__all__ = [
Expand Down
12 changes: 11 additions & 1 deletion deltacat/compute/compactor/model/compact_partition_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
MIN_FILES_IN_BATCH,
AVERAGE_RECORD_SIZE_BYTES,
TASK_MAX_PARALLELISM,
DROP_DUPLICATES,
)
from deltacat.constants import PYARROW_INFLATION_MULTIPLIER
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
Expand Down Expand Up @@ -88,6 +89,7 @@ def of(params: Optional[Dict]) -> CompactPartitionParams:
result.hash_group_count = params.get(
"hash_group_count", result.hash_bucket_count
)
result.drop_duplicates = params.get("drop_duplicates", DROP_DUPLICATES)

if not importlib.util.find_spec("memray"):
result.enable_profiler = False
Expand Down Expand Up @@ -196,7 +198,7 @@ def min_files_in_batch(self, min_files_in_batch: float) -> None:

@property
def min_delta_bytes_in_batch(self) -> float:
return self["min_files_in_batch"]
return self["min_delta_bytes_in_batch"]

@min_delta_bytes_in_batch.setter
def min_delta_bytes_in_batch(self, min_delta_bytes_in_batch: float) -> None:
Expand Down Expand Up @@ -258,6 +260,14 @@ def records_per_compacted_file(self) -> int:
def records_per_compacted_file(self, count: int) -> None:
self["records_per_compacted_file"] = count

@property
def drop_duplicates(self) -> bool:
return self["drop_duplicates"]

@drop_duplicates.setter
def drop_duplicates(self, value: bool):
self["drop_duplicates"] = value

@property
def bit_width_of_sort_keys(self) -> int:
return self["bit_width_of_sort_keys"]
Expand Down
13 changes: 13 additions & 0 deletions deltacat/compute/compactor/model/compaction_session_audit_info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations
import pyarrow as pa
import logging
from deltacat import logs
from typing import List, Union
Expand Down Expand Up @@ -419,6 +420,13 @@ def used_cpu_seconds(self) -> float:
"""
return self.get("usedCPUSeconds")

@property
def pyarrow_version(self) -> str:
"""
The version of PyArrow used.
"""
return self.get("pyarrowVersion")

# Setters follow

def set_audit_url(self, audit_url: str) -> CompactionSessionAuditInfo:
Expand Down Expand Up @@ -735,6 +743,10 @@ def set_used_cpu_seconds(self, value: float) -> CompactionSessionAuditInfo:
self["usedCPUSeconds"] = value
return self

def set_pyarrow_version(self, value: str) -> CompactionSessionAuditInfo:
self["pyarrowVersion"] = value
return self

# High level methods to save stats
def save_step_stats(
self,
Expand Down Expand Up @@ -863,4 +875,5 @@ def save_round_completion_stats(
)
)

self.set_pyarrow_version(pa.__version__)
self.set_telemetry_time_in_seconds(total_telemetry_time)
16 changes: 10 additions & 6 deletions deltacat/compute/compactor/model/delta_annotated.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ def rebatch(
for delta_annotated in annotated_deltas:
split_annotated_deltas.extend(DeltaAnnotated._split_single(delta_annotated))

logger.info(
f"Split the {len(annotated_deltas)} annotated deltas "
f"into {len(split_annotated_deltas)} groups."
)

for src_da in split_annotated_deltas:
src_da_annotations = src_da.annotations
src_da_entries = src_da.manifest.entries
Expand Down Expand Up @@ -280,12 +285,11 @@ def _split_single(delta_annotated: DeltaAnnotated) -> List[DeltaAnnotated]:
)

result.append(new_da)
else:
return [delta_annotated]

if result:
return result
else:
logger.info(
f"Split was not performed on the delta with locator: {delta_annotated.locator}"
)
logger.info(
f"Split was not performed on the delta with locator: {delta_annotated.locator}"
)

return [delta_annotated]
141 changes: 72 additions & 69 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def _execute_compaction(
# read the results from any previously completed compaction round
round_completion_info = None
high_watermark = None
previous_compacted_delta = None
previous_compacted_delta_manifest = None

if not params.rebase_source_partition_locator:
round_completion_info = rcf.read_round_completion_file(
Expand All @@ -147,13 +147,11 @@ def _execute_compaction(
)
else:
compacted_delta_locator = round_completion_info.compacted_delta_locator
previous_compacted_delta = params.deltacat_storage.get_delta(
namespace=compacted_delta_locator.namespace,
table_name=compacted_delta_locator.table_name,
table_version=compacted_delta_locator.table_version,
stream_position=compacted_delta_locator.stream_position,
include_manifest=True,
**params.deltacat_storage_kwargs,

previous_compacted_delta_manifest = (
params.deltacat_storage.get_delta_manifest(
compacted_delta_locator, **params.deltacat_storage_kwargs
)
)

high_watermark = round_completion_info.high_watermark
Expand Down Expand Up @@ -182,7 +180,22 @@ def _execute_compaction(
params.list_deltas_kwargs,
)

uniform_deltas = io.create_uniform_input_deltas(
input_deltas=input_deltas,
hash_bucket_count=params.hash_bucket_count,
compaction_audit=compaction_audit,
deltacat_storage=params.deltacat_storage,
previous_inflation=params.previous_inflation,
min_delta_bytes=params.min_delta_bytes_in_batch,
min_file_counts=params.min_files_in_batch,
# disable input split during rebase as the rebase files are already uniform
enable_input_split=params.rebase_source_partition_locator is None,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)

delta_discovery_end = time.monotonic()

compaction_audit.set_uniform_deltas_created(len(uniform_deltas))
compaction_audit.set_delta_discovery_time_in_seconds(
delta_discovery_end - delta_discovery_start
)
Expand All @@ -197,19 +210,6 @@ def _execute_compaction(
logger.info("No input deltas found to compact.")
return None, None, None

uniform_deltas = io.create_uniform_input_deltas(
input_deltas=input_deltas,
hash_bucket_count=params.hash_bucket_count,
compaction_audit=compaction_audit,
deltacat_storage=params.deltacat_storage,
previous_inflation=params.previous_inflation,
min_delta_bytes=params.min_delta_bytes_in_batch,
min_file_counts=params.min_files_in_batch,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)

compaction_audit.set_uniform_deltas_created(len(uniform_deltas))

hb_options_provider = functools.partial(
task_resource_options_provider,
pg_config=params.pg_config,
Expand All @@ -221,20 +221,21 @@ def _execute_compaction(

hb_start = time.monotonic()

hash_bucket_input_provider = lambda index, item: {
"input": HashBucketInput.of(
item,
primary_keys=params.primary_keys,
num_hash_buckets=params.hash_bucket_count,
num_hash_groups=params.hash_group_count,
enable_profiler=params.enable_profiler,
metrics_config=params.metrics_config,
read_kwargs_provider=params.read_kwargs_provider,
object_store=params.object_store,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)
}
def hash_bucket_input_provider(index, item):
return {
"input": HashBucketInput.of(
item,
primary_keys=params.primary_keys,
num_hash_buckets=params.hash_bucket_count,
num_hash_groups=params.hash_group_count,
enable_profiler=params.enable_profiler,
metrics_config=params.metrics_config,
read_kwargs_provider=params.read_kwargs_provider,
object_store=params.object_store,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)
}

hb_tasks_pending = invoke_parallel(
items=uniform_deltas,
Expand Down Expand Up @@ -332,33 +333,36 @@ def _execute_compaction(
hash_group_size_bytes=all_hash_group_idx_to_size_bytes,
hash_group_num_rows=all_hash_group_idx_to_num_rows,
round_completion_info=round_completion_info,
compacted_delta=previous_compacted_delta,
compacted_delta_manifest=previous_compacted_delta_manifest,
primary_keys=params.primary_keys,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)

merge_input_provider = lambda index, item: {
"input": MergeInput.of(
dfe_groups_refs=item[1],
write_to_partition=compacted_partition,
compacted_file_content_type=params.compacted_file_content_type,
primary_keys=params.primary_keys,
sort_keys=params.sort_keys,
merge_task_index=index,
hash_group_index=item[0],
num_hash_groups=params.hash_group_count,
max_records_per_output_file=params.records_per_compacted_file,
enable_profiler=params.enable_profiler,
metrics_config=params.metrics_config,
s3_table_writer_kwargs=params.s3_table_writer_kwargs,
read_kwargs_provider=params.read_kwargs_provider,
round_completion_info=round_completion_info,
object_store=params.object_store,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)
}
def merge_input_provider(index, item):
return {
"input": MergeInput.of(
dfe_groups_refs=item[1],
write_to_partition=compacted_partition,
compacted_file_content_type=params.compacted_file_content_type,
primary_keys=params.primary_keys,
sort_keys=params.sort_keys,
merge_task_index=index,
hash_bucket_count=params.hash_bucket_count,
drop_duplicates=params.drop_duplicates,
hash_group_index=item[0],
num_hash_groups=params.hash_group_count,
max_records_per_output_file=params.records_per_compacted_file,
enable_profiler=params.enable_profiler,
metrics_config=params.metrics_config,
s3_table_writer_kwargs=params.s3_table_writer_kwargs,
read_kwargs_provider=params.read_kwargs_provider,
round_completion_info=round_completion_info,
object_store=params.object_store,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)
}

merge_start = time.monotonic()

Expand Down Expand Up @@ -399,33 +403,32 @@ def _execute_compaction(
mat_results, key=lambda m: m.task_index
)

deltas = [m.delta for m in mat_results]

hb_id_to_entry_indices_range = {}
file_index = 0
previous_task_index = -1

for m in mat_results:
assert m.pyarrow_write_result.files >= 1, "Atleast file must be materialized"
assert m.task_index != previous_task_index, (
"Multiple materialize results found for a " f"hash bucket: {m.task_index}"
)
for mat_result in mat_results:
assert (
mat_result.pyarrow_write_result.files >= 1
), "Atleast one file must be materialized"
assert (
mat_result.task_index != previous_task_index
), f"Multiple materialize results found for a hash bucket: {mat_result.task_index}"

hb_id_to_entry_indices_range[str(m.task_index)] = (
hb_id_to_entry_indices_range[str(mat_result.task_index)] = (
file_index,
file_index + m.pyarrow_write_result.files - 1,
file_index + mat_result.pyarrow_write_result.files,
)

file_index += m.pyarrow_write_result.files
previous_task_index = m.task_index
file_index += mat_result.pyarrow_write_result.files
previous_task_index = mat_result.task_index

s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**params.s3_client_kwargs,
)

mat_results = sorted(mat_results, key=lambda m: m.task_index)
deltas = [m.delta for m in mat_results]

# Note: An appropriate last stream position must be set
Expand Down
3 changes: 3 additions & 0 deletions deltacat/compute/compactor_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@
# Since, sorting is nlogn, we ensure that is not performed
# on a very large dataset for best performance.
MAX_SIZE_OF_RECORD_BATCH_IN_GIB = 2 * 1024 * 1024 * 1024

# Whether to drop duplicates during merge.
DROP_DUPLICATES = True
18 changes: 17 additions & 1 deletion deltacat/compute/compactor_v2/model/merge_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
SortKey,
interface as unimplemented_deltacat_storage,
)
from deltacat.compute.compactor_v2.constants import (
DROP_DUPLICATES,
MAX_RECORDS_PER_COMPACTED_FILE,
)
from deltacat.types.media import ContentType
from deltacat.compute.compactor.model.round_completion_info import RoundCompletionInfo
from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups
Expand All @@ -24,9 +28,11 @@ def of(
primary_keys: List[str],
hash_group_index: int,
num_hash_groups: int,
hash_bucket_count: int,
drop_duplicates: Optional[bool] = DROP_DUPLICATES,
sort_keys: Optional[List[SortKey]] = None,
merge_task_index: Optional[int] = 0,
max_records_per_output_file: Optional[int] = 4_000_000,
max_records_per_output_file: Optional[int] = MAX_RECORDS_PER_COMPACTED_FILE,
enable_profiler: Optional[bool] = False,
metrics_config: Optional[MetricsConfig] = None,
s3_table_writer_kwargs: Optional[Dict[str, Any]] = None,
Expand All @@ -44,6 +50,8 @@ def of(
result["primary_keys"] = primary_keys
result["hash_group_index"] = hash_group_index
result["num_hash_groups"] = num_hash_groups
result["hash_bucket_count"] = hash_bucket_count
result["drop_duplicates"] = drop_duplicates
result["sort_keys"] = sort_keys
result["merge_task_index"] = merge_task_index
result["max_records_per_output_file"] = max_records_per_output_file
Expand Down Expand Up @@ -82,6 +90,14 @@ def hash_group_index(self) -> int:
def num_hash_groups(self) -> int:
return self["num_hash_groups"]

@property
def hash_bucket_count(self) -> int:
return self["hash_bucket_count"]

@property
def drop_duplicates(self) -> int:
return self["drop_duplicates"]

@property
def sort_keys(self) -> Optional[List[SortKey]]:
return self.get("sort_keys")
Expand Down
Loading

0 comments on commit 9ba2aba

Please sign in to comment.