-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_v3.py
89 lines (80 loc) · 3.11 KB
/
main_v3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
from typing import List
import boto3
from dotenv import load_dotenv
# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()
# Configurações da AWS a partir do .env
AWS_ACCESS_KEY_ID: str = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY: str = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION: str = os.getenv('AWS_REGION')
BUCKET_NAME: str = os.getenv('BUCKET_NAME')
# Print para verificar se as variáveis de ambiente foram carregadas corretamente
print(f"AWS_ACCESS_KEY_ID: {AWS_ACCESS_KEY_ID}")
print(f"AWS_SECRET_ACCESS_KEY: {AWS_SECRET_ACCESS_KEY}")
print(f"AWS_REGION: {AWS_REGION}")
print(f"BUCKET_NAME: {BUCKET_NAME}")
# Configura o cliente S3
try:
s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
print("Cliente S3 configurado com sucesso.")
except Exception as e:
print(f"Erro ao configurar o cliente S3: {e}")
raise
def listar_arquivos(pasta: str) -> List[str]:
"""Lista todos os arquivos em uma pasta local."""
arquivos: List[str] = []
try:
for nome_arquivo in os.listdir(pasta):
caminho_completo = os.path.join(pasta, nome_arquivo)
if os.path.isfile(caminho_completo):
arquivos.append(caminho_completo)
print(f"Arquivos listados na pasta '{pasta}': {arquivos}")
except Exception as e:
print(f"Erro ao listar arquivos na pasta '{pasta}': {e}")
raise
return arquivos
def upload_arquivos_para_s3(arquivos: List[str]) -> None:
"""Faz upload dos arquivos listados para o S3."""
for arquivo in arquivos:
nome_arquivo: str = os.path.basename(arquivo)
try:
print(f"Tentando fazer upload de '{nome_arquivo}' para o bucket '{BUCKET_NAME}'...")
s3_client.upload_file(arquivo, BUCKET_NAME, nome_arquivo)
print(f"{nome_arquivo} foi enviado para o S3.")
except Exception as e:
print(f"Erro ao enviar '{nome_arquivo}' para o S3: {e}")
raise
def deletar_arquivos_locais(arquivos: List[str]) -> None:
"""Deleta os arquivos locais após o upload."""
for arquivo in arquivos:
try:
os.remove(arquivo)
print(f"{arquivo} foi deletado do local.")
except Exception as e:
print(f"Erro ao deletar o arquivo '{arquivo}': {e}")
raise
def executar_backup(pasta: str) -> None:
"""Executa o processo completo de backup."""
try:
print(f"Iniciando o processo de backup para a pasta '{pasta}'...")
arquivos: List[str] = listar_arquivos(pasta)
if arquivos:
upload_arquivos_para_s3(arquivos)
deletar_arquivos_locais(arquivos)
else:
print("Nenhum arquivo encontrado para backup.")
except Exception as e:
print(f"Erro no processo de backup: {e}")
raise
if __name__ == "__main__":
PASTA_LOCAL: str = 'files' # Substitua pelo caminho da sua pasta local
try:
executar_backup(PASTA_LOCAL)
except Exception as e:
print(f"Erro ao executar o backup: {e}")