Este repositório contém o 1º desafio da mentoria individual do Programa Desenvolve 2023 - Trilha Dados. O desafio se trata do desenvolvimento de um ETL simples, onde se buscou extrair, processar e carregar dados da API pública do Spotify em um banco de dados PostgreSQL, usando a linguagem Python.
Executar projeto | Desafio | Tecnologias | Dados | Projeto ETL
Para instalar as bibliotecas necessárias para executar este projeto, deve ser usado o arquivo requirements.txt
. Para fazer isso, abra o terminal, navegue até a pasta do seu priojeto e execute o seguinte comando:
pip install -r requirements.txt
Feito isso, podemos executar o arquivo main.py
usando o comando:
python3 main.py
-
Utilizar Python para ler dados da API do Spotify e encontrar episódios onde a palavra "Python" aparecer.
-
Armazenar no banco de dados Postgresql as seguintes informações:
- id do episodio;
- descrição;
- link;
- lista de imagens
-
Baixar as imagens dos eposódios a partir da lista de imagens gravadas no banco de dados e gravar em uma pasta dentro do projeto.
- Padronização do código;
- Identação;
- Versionamento;
- Programação Orientada a Objetos
-
Fonte dos dados: Spotify API
-
Documentação: documentação
PARTE 1 - Utilizar Python para ler dados da API do Spotify e encontrar episódios onde a palavra "Python" aparecer.
Para se conectar e ler dados da API do Spotify, foi desenvolvida a classe Spotipy
, contendo as funções de altenticação e busca de espisódios usando a biblioteca spotipy:
class Spotipy():
def __init__ (self, CLIENT_ID, CLIENT_SECRET):
self.CLIENT_ID = CLIENT_ID
self.CLIENT_SECRET = CLIENT_SECRET
def authentication(self):
auth_manager = SpotifyClientCredentials(
client_id = self.CLIENT_ID,
client_secret = self.CLIENT_SECRET
)
self.sp = spotipy.Spotify(auth_manager = auth_manager)
return self.sp
def get_all_episodes_with_python(self, sp):
self.episodes = []
self.offset = 0
self.limit = 50 # max limit
self.market='BR'
while True:
self.results = sp.search(q='Python', type='episode', limit=self.limit, offset=self.offset, market=self.market)
self.episodes += self.results['episodes']['items']
self.offset += self.limit
if len(self.results['episodes']['items']) == 0:
break
return self.episodes
PARTE 2 - Armazenar no banco de dados Postgresql as seguintes informações: id do episodio, descrição, link, lista de imagens
A transformação dos dados foi realizada usando listas e tuplas, tipos de dados nativos do Python. A motivação do uso desses tipos de dados foi a performance, seja na manipulação ou na inserção desses dados no banco de dados usando BULK INSERT.
episode = tuple()
episode_list = list()
for ep in episodes:
episode = ep['id'], ep['description'], ep['external_urls']['spotify'], ep['href']
episode_list.append(episode)
image = tuple()
image_list = list()
for ep in episodes:
for j, im in enumerate(ep['images']):
image = ep['id'], j+1, im['height'], im['width'], im['url']
image_list.append(image)
Para as funções relacionadas ao banco de dados, foi usada a biblioteca psycopg2. Todas essas funções, desde a conexão com o banco, criação das tabelas, inserção e consulta aos dados, estão contidas na classe Database
:
class Database:
def __init__ (self, HOST, DATABASE, USER, PASSWORD):
print('Connecting to spotifydb...')
self.HOST=HOST
self.DATABASE=DATABASE
self.USER=USER
self.PASSWORD=PASSWORD
def connect_db(self):
self.conn = psycopg2.connect(
host=self.HOST,
database=self.DATABASE,
user=self.USER,
password=self.PASSWORD
)
return self.conn
def create_db(self, sql):
self.conn = self.connect_db()
self.cur = self.conn.cursor()
self.cur.execute(sql)
self.conn.commit()
self.conn.close()
def insert_db(self, sql):
self.conn = self.connect_db()
self.cur = self.conn.cursor()
try:
self.cur.execute(sql)
self.conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(f"Error: {error}")
self.conn.rollback()
self.cur.close()
return 1
self.cur.close()
def bulk_insert_db(self, sql, data):
self.conn = self.connect_db()
self.cur = self.conn.cursor()
psycopg2.extras.execute_values(self.cur, sql, data)
self.conn.commit()
self.cur.close()
self.conn.close()
def select_db(self, sql):
self.conn = self.connect_db()
self.cur = self.conn.cursor()
self.cur.execute(sql)
self.recset = self.cur.fetchall()
self.records = []
for rec in self.recset:
self.records.append(rec)
self.conn.close()
return self.records
PARTE 3 - Baixar as imagens dos eposódios a partir da lista de imagens gravadas no banco de dados e gravar em um diretório dentro do projeto
Para o download das imagens, foi usada a biblioteca requests, enquando a biblioteca os foi usada para criação das pastas e gravação dos arquivos das imagens:
for i, row in df_images.iterrows():
response = requests.get(row['url'])
image_filename = f"{row['id']}_{i}.jpg"
id = ''
if id != row['id']:
id = row['id']
path = f'images/{id}'
if not os.path.exists(f'{path}/'):
os.mkdir(f'{path}/')
image_path = os.path.join(f'images/{id}', image_filename)
with open(image_path, 'wb') as f:
f.write(response.content)