-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_v2.py
97 lines (84 loc) · 3.26 KB
/
main_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
from typing import List
import boto3
from dotenv import load_dotenv
import logging
# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()
AWS_ACCESS_KEY_ID: str = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY: str = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION: str = os.getenv("AWS_REGION")
AWS_BUCKET_NAME: str = os.getenv("AWS_BUCKET_NAME")
# Cria client S3
try:
s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
logger.info("Cliente S3 inicializado com sucesso.")
except Exception as e:
logger.error("Erro ao inicializar cliente S3: %s", e)
raise
# Lista arquivos na pasta
def listar_arquivos(pasta: str) -> List:
arquivos: List = [] # Inicializa fora do bloco try
try:
for arquivo in os.listdir(pasta):
caminho_completo = os.path.join(pasta, arquivo)
if os.path.isfile(caminho_completo):
arquivos.append(caminho_completo)
except Exception as e:
logger.error("Erro ao listar arquivos na pasta '%s': %s", pasta, e)
print(arquivos)
return arquivos # Garante que arquivos será retornado, mesmo se ocorrer um erro
# Faz upload de arquivos para o S3
def upload_to_s3(arquivos: List):
if not arquivos: # Verifica se a lista está vazia
logger.warning("Nenhum arquivo para carregar no S3.")
return
logger.info("Carregando %d arquivos no S3.", len(arquivos))
for arquivo in arquivos:
try:
if not os.path.isfile(arquivo): # Verifica se o arquivo realmente existe
logger.warning("Arquivo '%s' não encontrado. Ignorando.", arquivo)
continue
nome_arquivo = os.path.basename(arquivo)
print(nome_arquivo)
s3_client.upload_file(arquivo, AWS_BUCKET_NAME, arquivo)
logger.info("Arquivo '%s' carregado com sucesso no S3.", nome_arquivo)
except Exception as e:
logger.error("Erro ao carregar o arquivo '%s' para o S3: %s", arquivo, e)
# Deleta arquivos locais
def deleta_local(arquivos: List):
for arquivo in arquivos:
try:
os.remove(arquivo)
logger.info("Arquivo '%s' deletado com sucesso.", arquivo)
except Exception as e:
logger.error("Erro ao deletar o arquivo '%s': %s", arquivo, e)
# Pipeline principal
def pipeline(pasta: str):
try:
arquivos: List = listar_arquivos(pasta)
if arquivos:
try:
upload_to_s3(arquivos)
except Exception as e:
logger.error("Erro ao carregar arquivos para o S3: %s", e)
try:
deleta_local(arquivos)
except Exception as e:
logger.error("Erro ao executar a pipeline: %s", e)
else:
logger.info("Nenhum arquivo encontrado na pasta '%s'.", pasta)
except Exception as e:
logger.error("Erro na execução da pipeline: %s", e)
if __name__ == '__main__':
logger.info("Inicializando pipeline.")
pasta = 'files'
pipeline(pasta)