You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
def download_video_blob(blob_ref, save_path, access_jwt):
"""
Descarga el vídeo utilizando el endpoint /xrpc/com.atproto.sync.getBlob.
Se espera que blob_ref sea el valor del campo '$link' en el objeto video (de tipo blob).
"""
endpoint = f"{BLUESKY_API_BASE_URL}/xrpc/com.atproto.sync.getBlob"
headers = {
'Authorization': f'Bearer {access_jwt}',
'Accept': 'application/octet-stream'
}
params = {"ref": blob_ref}
try:
print(f"Descargando vídeo con blob reference: {blob_ref}")
response = send_request("GET", endpoint, headers=headers, params=params, stream=True)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Vídeo descargado exitosamente en: {save_path}")
except Exception as e:
print(f"Error descargando el vídeo blob: {e}")
def process_post(item, access_jwt, csv_writer, headers_auth, image_folder, video_folder, profile_cache, image_counts, video_counts):
[.....]
video_link = "N/A"
embed_obj = record.get("embed", {})
if embed_obj.get("$type", "").startswith("app.bsky.embed.video"):
video_obj = embed_obj.get("video", {})
if video_obj:
blob_ref = video_obj.get("ref", {}).get("$link")
if blob_ref:
video_counts[username] = video_counts.get(username, 0) + 1
video_filename = os.path.join(video_folder, f"{username}_{video_counts[username]}.mp4")
download_video_blob(blob_ref, video_filename, access_jwt)
video_link = blob_ref
else:
print("No se encontró el blob reference del vídeo.")
else:
print("No se encontró el objeto video en el embed.")
else:
uri = base_post.get("uri", "")
last_part = uri.split("/")[-1] if uri else ""
[...]
The text was updated successfully, but these errors were encountered:
I tried this but not working
def download_video_blob(blob_ref, save_path, access_jwt):
"""
Descarga el vídeo utilizando el endpoint /xrpc/com.atproto.sync.getBlob.
Se espera que blob_ref sea el valor del campo '$link' en el objeto video (de tipo blob).
"""
endpoint = f"{BLUESKY_API_BASE_URL}/xrpc/com.atproto.sync.getBlob"
headers = {
'Authorization': f'Bearer {access_jwt}',
'Accept': 'application/octet-stream'
}
params = {"ref": blob_ref}
try:
print(f"Descargando vídeo con blob reference: {blob_ref}")
response = send_request("GET", endpoint, headers=headers, params=params, stream=True)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Vídeo descargado exitosamente en: {save_path}")
except Exception as e:
print(f"Error descargando el vídeo blob: {e}")
def process_post(item, access_jwt, csv_writer, headers_auth, image_folder, video_folder, profile_cache, image_counts, video_counts):
[.....]
video_link = "N/A"
embed_obj = record.get("embed", {})
if embed_obj.get("$type", "").startswith("app.bsky.embed.video"):
video_obj = embed_obj.get("video", {})
if video_obj:
blob_ref = video_obj.get("ref", {}).get("$link")
if blob_ref:
video_counts[username] = video_counts.get(username, 0) + 1
video_filename = os.path.join(video_folder, f"{username}_{video_counts[username]}.mp4")
download_video_blob(blob_ref, video_filename, access_jwt)
video_link = blob_ref
else:
print("No se encontró el blob reference del vídeo.")
else:
print("No se encontró el objeto video en el embed.")
else:
uri = base_post.get("uri", "")
last_part = uri.split("/")[-1] if uri else ""
[...]
The text was updated successfully, but these errors were encountered: