Фінальний проєкт можна виконати одним із двох способів:
- Використовуючи Google Cloud, а саме сервіси GCS та BigQuery
- Використовуючи Pyspark локально
Незалежно який варіант Ви виберете, оркеструвати data workflow потрібно за допомогою Apache Airflow
Якщо Ви вибрали перший варіант (Google Cloud), приклад як оркеструвати
сервіси GCS та BigQuery Ви можете знайти у файлі example_dag_2.py
, в папці 7ї лекції.
Якщо Ви вибрали другий варіант, то приклад оркестрації джоб Pyspark
можна знайти у файлі example_dag_3.py
.
Важливо!! Подальший опис завдання викладений з припущенням, що Ви вибрали перший підхід (Google Cloud). Якщо Ви вибираєте другий підхід, то Вам треба адаптувати опис завдання самому. Для цього замість "бакет" або "датасет" використовуйте слово "директорія".
Звичайно, Ви можете виконати завдання використовуючи обидва варіанти. Хоча на бал оцінки це не впливає, це допоможе краще відчути різницю обох підходів.
При використанні варіанту з Pyspark обов'язково всю роботу виконуйте лише локально! Аналогічна задача, виконана в клауді, майже нічим не буде відрізнатися від виконаної локально (окрім, звісно, DevOps частини - деплой, менеджмент прав і т.п.). Проте використання Pyspark в клауді може виявитись занадто дорогим!
- Файли з даними для виконання проєкту знаходяться в директорії
data
- Вам потрібно створити бакет в GCS для зберігання сирих файлів (далі - raw-бакет) та скопіювати файли в нього.
- В Bigquery потрібно створити наступні датасети:
bronze
,silver
,gold
- В реальному проекті файли знаходяться на зовнішньому джерелі і відбувається стадія ingestion. Для спрощення ми вважаємо що файли вже знаходяться в raw-бакеті.
- Вам потрібно зробити проект в GCS. Оркестрація має віддбуватись Apache Airflow. Будь-яке переміщення чи трансформація даних має керуватись за допомогою Apache Airflow.
- Пул реквест має містити історію комітів згідно легенді проекта. якщо буде один коміт – буде 0 балів
Ви працюєте в Data відділі компанії, що займається продажем побутової електроніки.
Ваш відділ отримав завдання проаналізувати продажі за географічною
локацією покупців (по штатах) та за віком.
Для виконання цієї задачі вам вдалось домовитись про інтеграцію 2 джерел даних: customers
та sales
.
Для цього вам потрібно розробити 2 пайплайни даних:
Ви вирішили вставити дані з raw-бакета в bronze
за допомогою читання
файлу як зовнішньої таблиці (schema-on-read), адже формат файлу .csv
,
і такий спосіб є найбезпечнішим. Схему ви вирішили накласти
так, щоб будь-яке поле у файлі вважати STRING. В таблиці sales
в датасеті bronze
всі поля теж STRING.
Назви колонок в bronze
ви вирішили залишити з файлу джерела
для того, щоб полегшити пошук помилок, а саме:
CustomerId, PurchaseDate, Product, Price
При трансфері даних в silver
ви помітили, що дані мають деякий "бруд"
і вирішили їх почистити.
Схема в silver
вже має ті типи, які зручні для дослідження даних.
Колонки мають назви згідно правил, прийнятих в компанії, а саме:
client_id, purchase_date, product_name, price
Дані ви вирішили партиціонувати, так як даних багато і аналітикам потрібно робити виборки відштовхуючись від дати.
Дані вже приходять партиціоновані - окрема дата в окремій папці.
Це джерело даних приходить дивним чином: постачальник даних не погодився класти дані за кожен день в окрему папку, натомість він кожен зливає весь дамп таблиці кожного дня. Тобто, кожен наступний день містить дані за всі попередні дні.
Дані ви вирішуєте не партиціонувати, адже їх небагато.
Ви прокидуєте дані на рівні bronze
та silver
.
В silver
таблиця має наступні колонки:
client_id, first_name, last_name, email, registration_date, state
В bronze
назви колонки лишаються оригінальні (такі, як в CSV файлі)
Ви запускаєте обидва DAGи, всі дані трансферяться успішно.
Аналітики аналізують дані в silver
і помічають, що клієнти не заповнювали
деякі дані, тому вони порожні. Найважливіще - це назви штатів, бо
без них не можна робити аналітику по географічному розташуванню.
А дані про вік покупців взагалі відсутні.
Також, в планах компанії є впровадження нотифікацій покупців,
а записи імен та прізвищ заповнення не всюди (користувачі мають звичку
заповнювати або імʼя, або прізвище вибірково)
Щоб вирішити описані вище проблеми, ви домовляєтесь про інтеграцію
третього джерела даних: user_profiles
:
Ці дані постачальник доставляє в форматі JSONLine. Ви будуєте пайплайн і
трансферите їх до рівня silver
. Дані мають ідеальну якість.
Даний пайплайн запускається вручну, а не за розкладом.
Після того, як дані всі процесяться успішно, треба збагатити дані
customers
за допомогою даних з user_profiles
.
Це єдини пайплайн, який пише дані в рівень gold
.
В датасеті gold
має створюватись таблиця user_profiles_enriched
в результаті роботи цього пайплайну. Ця таблиця містить всі дані з таблиці
silver.customers
, але мають бути заповненні всі імена, прізвища, штати
даними з таблиці silver.user_profiles
. Також додатково потрібно дадати
всі поля, які є в silver.user_profiles
та не вистачає в silver.customers
(наприклад, phone_number
).
Цей пайплайн теж має запускатись вручну (не за розкладом)
За бажанням можна реалізувати, щоб 'process_user_profiles' при успішному виконанні запускав 'enrich_user_profiles'
В решті-решт, коли всі пайплайни побудовані, спробуйте дати відьповідь на наступну аналітичну задачу:
В якому штаті було куплено найбільше телевізорів покупцями від 20 до 30 років за першу декаду вересня?
- Поцікавтесь, що робить вираз
MERGE
в SQL. Можливо, він стане у нагоді. - В контексті виконання цього завдання ви маєте спробувати на практиці наступні дата інженерні підходи роботи з даними, а саме: Data Cleansing, Data Wrangling та Data Enrichment