diff --git a/.github/workflows/Formatter.yml b/.github/workflows/Formatter.yml deleted file mode 100644 index eabe54c..0000000 --- a/.github/workflows/Formatter.yml +++ /dev/null @@ -1,34 +0,0 @@ -name: Format code - -on: push - -jobs: - formatter: - name: formatter - runs-on: ubuntu-latest - strategy: - matrix: - python-version: [3.11.0] - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - ref: ${{ github.head_ref }} - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - name: Install Dependencies - run: | - python -m pip install --upgrade pip - pip install autoflake black isort - - name: autoflake - run: autoflake -r . - - name: black - run: black . - - name: isort - run: isort . - - name: Auto Commit - uses: stefanzweifel/git-auto-commit-action@v4 - with: - commit_message: Apply Code Formatter Change diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index f5c99b8..ccc5b7a 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -13,15 +13,15 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: 3.9 # 使用するPythonのバージョンを指定してください + python-version: 3.9 - name: Install dependencies run: | python -m pip install --upgrade pip pip install pipenv pip install git+https://github.com/aoki-h-jp/binance-bulk-downloader - pipenv install --dev # Pipenvを使用して依存関係をインストール + pipenv install --dev - name: Run pytest run: | - pipenv run pytest -v -s # pytestを実行するコマンドを指定 + pipenv run pytest -v -s diff --git a/README.md b/README.md index c1397cc..5702da8 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,21 @@ # binance-bulk-downloader + [![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)](https://www.python.org/downloads/release/python-3110//) -[![Format code](https://github.com/aoki-h-jp/binance-bulk-downloader/actions/workflows/Formatter.yml/badge.svg?branch=main)](https://github.com/aoki-h-jp/binance-bulk-downloader/actions/workflows/Formatter.yml) [![pytest](https://github.com/aoki-h-jp/binance-bulk-downloader/actions/workflows/pytest.yaml/badge.svg)](https://github.com/aoki-h-jp/binance-bulk-downloader/actions/workflows/pytest.yaml) [![Github All Releases](https://img.shields.io/github/downloads/aoki-h-jp/binance-liquidation-feeder/total.svg)]() ## Python library for bulk downloading Binance historical data + A Python library to efficiently and concurrently download historical data files from Binance. Supports all asset types (spot, USDT-M, COIN-M, options) and all data frequencies. ## Installation ```bash -pip install git+https://github.com/aoki-h-jp/binance-bulk-downloader +pip install binance-bulk-downloader ``` ## Usage + ### Download all klines 1m data (USDT-M futures) ```python @@ -32,6 +34,31 @@ downloader = BinanceBulkDownloader(data_frequency='1h', asset='spot') downloader.run_download() ``` +### Download specific symbols only + +```python +from binance_bulk_downloader.downloader import BinanceBulkDownloader + +# Download single symbol +downloader = BinanceBulkDownloader( + data_type="klines", + data_frequency="1h", + asset="spot", + timeperiod_per_file="daily", + symbols="BTCUSDT", +) +downloader.run_download() + +# Download multiple symbols +downloader = BinanceBulkDownloader( + data_type="trades", + asset="spot", + timeperiod_per_file="daily", + symbols=["BTCUSDT", "ETHUSDT"], +) +downloader.run_download() +``` + ### Download all aggTrades data (USDT-M futures) ```python @@ -42,6 +69,7 @@ downloader.run_download() ``` ### Other examples + Please see /example directory. ```bash @@ -55,25 +83,26 @@ python -m pytest ``` ## Available data types + ✅: Implemented and tested. ❌: Not available on Binance. ### by data_type -| data_type | spot | um | cm | options | -| :------------------ | :--: | :--: | :--: | :-----: | -| aggTrades | ✅ | ✅ | ✅ | ❌ | -| bookDepth | ❌ | ✅ | ✅ | ❌ | -| bookTicker | ❌ | ✅ | ✅ | ❌ | -| fundingRate | ❌ | ✅ | ✅ | ❌ | -| indexPriceKlines | ❌ | ✅ | ✅ | ❌ | -| klines | ✅ | ✅ | ✅ | ❌ | -| liquidationSnapshot | ❌ | ✅ | ✅ | ❌ | -| markPriceKlines | ❌ | ✅ | ✅ | ❌ | -| metrics | ❌ | ✅ | ✅ | ❌ | -| premiumIndexKlines | ❌ | ✅ | ✅ | ❌ | -| trades | ✅ | ✅ | ✅ | ❌ | -| BVOLIndex | ❌ | ❌ | ❌ | ✅ | -| EOHSummary | ❌ | ❌ | ❌ | ✅ | +| data_type | spot | um | cm | options | +| :------------------ | :--: | :--: | :--: | :-----: | +| aggTrades | ✅ | ✅ | ✅ | ❌ | +| bookDepth | ❌ | ✅ | ✅ | ❌ | +| bookTicker | ❌ | ✅ | ✅ | ❌ | +| fundingRate | ❌ | ✅ | ✅ | ❌ | +| indexPriceKlines | ❌ | ✅ | ✅ | ❌ | +| klines | ✅ | ✅ | ✅ | ❌ | +| liquidationSnapshot | ❌ | ✅ | ✅ | ❌ | +| markPriceKlines | ❌ | ✅ | ✅ | ❌ | +| metrics | ❌ | ✅ | ✅ | ❌ | +| premiumIndexKlines | ❌ | ✅ | ✅ | ❌ | +| trades | ✅ | ✅ | ✅ | ❌ | +| BVOLIndex | ❌ | ❌ | ❌ | ✅ | +| EOHSummary | ❌ | ❌ | ❌ | ✅ | ### by data_frequency (klines, indexPriceKlines, markPriceKlines, premiumIndexKlines) @@ -97,9 +126,11 @@ python -m pytest | 1mo | ✅ | ✅ | ✅ | ❌ | ## If you want to report a bug or request a feature + Please create an issue on this repository! ## Disclaimer + This project is for educational purposes only. You should not construe any such information or other material as legal, tax, investment, financial, or other advice. Nothing contained here constitutes a solicitation, recommendation, endorsement, or offer by me or any third party service provider to buy or sell any securities or other financial diff --git a/binance_bulk_downloader/__init__.py b/binance_bulk_downloader/__init__.py index f9fbc56..cf9fca6 100644 --- a/binance_bulk_downloader/__init__.py +++ b/binance_bulk_downloader/__init__.py @@ -1,5 +1,6 @@ """ BinanceBulkDownloader: A library to efficiently and concurrently download historical data from Binance. """ + import binance_bulk_downloader.downloader import binance_bulk_downloader.exceptions diff --git a/binance_bulk_downloader/downloader.py b/binance_bulk_downloader/downloader.py index ec07a28..620f433 100644 --- a/binance_bulk_downloader/downloader.py +++ b/binance_bulk_downloader/downloader.py @@ -1,24 +1,35 @@ """ Binance Bulk Downloader """ + # import standard libraries import os import zipfile from concurrent.futures import ThreadPoolExecutor from xml.etree import ElementTree from zipfile import BadZipfile +from typing import Optional, List, Union # import third-party libraries import requests -from rich import print -from rich.progress import track +from rich.console import Console +from rich.panel import Panel +from rich.live import Live +from rich.text import Text # import my libraries from binance_bulk_downloader.exceptions import ( - BinanceBulkDownloaderDownloadError, BinanceBulkDownloaderParamsError) + BinanceBulkDownloaderDownloadError, + BinanceBulkDownloaderParamsError, +) class BinanceBulkDownloader: + """ + Binance Bulk Downloader class for downloading historical data from Binance Vision. + Supports all asset types (spot, USDT-M, COIN-M, options) and all data frequencies. + """ + _CHUNK_SIZE = 100 _BINANCE_DATA_S3_BUCKET_URL = ( "https://s3-ap-northeast-1.amazonaws.com/data.binance.vision" @@ -114,97 +125,144 @@ def __init__( data_frequency="1m", asset="um", timeperiod_per_file="daily", + symbols: Optional[Union[str, List[str]]] = None, ) -> None: """ + Initialize BinanceBulkDownloader + :param destination_dir: Destination directory for downloaded files :param data_type: Type of data to download (klines, aggTrades, etc.) :param data_frequency: Frequency of data to download (1m, 1h, 1d, etc.) :param asset: Type of asset to download (um, cm, spot, option) :param timeperiod_per_file: Time period per file (daily, monthly) + :param symbols: Optional. Symbol or list of symbols to download (e.g., "BTCUSDT" or ["BTCUSDT", "ETHUSDT"]). + If None or empty list is provided, all available symbols will be downloaded. """ self._destination_dir = destination_dir self._data_type = data_type self._data_frequency = data_frequency self._asset = asset self._timeperiod_per_file = timeperiod_per_file + self._symbols = [symbols] if isinstance(symbols, str) else symbols self.marker = None self.is_truncated = True - self.downloaded_list = [] + self.downloaded_list: list[str] = [] + self.console = Console() def _check_params(self) -> None: """ Check params :return: None """ - if ( - self._data_type - not in self._DATA_TYPE_BY_ASSET[self._asset][self._timeperiod_per_file] - ): + # Check asset type first + if self._asset not in self._ASSET + self._FUTURES_ASSET + self._OPTIONS_ASSET: raise BinanceBulkDownloaderParamsError( - f"data_type must be {self._DATA_TYPE_BY_ASSET[self._asset][self._timeperiod_per_file]}." + f"asset must be {self._ASSET + self._FUTURES_ASSET + self._OPTIONS_ASSET}." + ) + + # Check time period + if self._timeperiod_per_file not in ["daily", "monthly"]: + raise BinanceBulkDownloaderParamsError( + "timeperiod_per_file must be daily or monthly." ) + # Check data frequency if self._data_frequency not in self._DATA_FREQUENCY: raise BinanceBulkDownloaderParamsError( f"data_frequency must be {self._DATA_FREQUENCY}." ) - if self._asset not in self._ASSET + self._FUTURES_ASSET + self._OPTIONS_ASSET: + # Check if asset exists in DATA_TYPE_BY_ASSET + if self._asset not in self._DATA_TYPE_BY_ASSET: raise BinanceBulkDownloaderParamsError( - f"asset must be {self._ASSET + self._FUTURES_ASSET + self._OPTIONS_ASSET}." + f"asset {self._asset} is not supported." ) - if self._timeperiod_per_file not in ["daily", "monthly"]: + # Check if timeperiod exists for the asset + asset_data = self._DATA_TYPE_BY_ASSET.get(self._asset, {}) + if self._timeperiod_per_file not in asset_data: raise BinanceBulkDownloaderParamsError( - f"timeperiod_per_file must be daily or monthly." + f"timeperiod {self._timeperiod_per_file} is not supported for {self._asset}." ) - if not self._data_type in self._DATA_TYPE_BY_ASSET.get(self._asset, None).get( - self._timeperiod_per_file, None - ): + # Check data type + valid_data_types = asset_data.get(self._timeperiod_per_file, []) + if self._data_type not in valid_data_types: raise BinanceBulkDownloaderParamsError( - f"data_type must be {self._DATA_TYPE_BY_ASSET[self._asset][self._timeperiod_per_file]}." + f"data_type must be one of {valid_data_types}." ) + # Check 1s frequency restriction if self._data_frequency == "1s": - if self._asset == "spot": - pass - else: + if self._asset != "spot": raise BinanceBulkDownloaderParamsError( f"data_frequency 1s is not supported for {self._asset}." ) - def _get_file_list_from_s3_bucket(self, prefix, marker=None, is_truncated=False): + def _get_file_list_from_s3_bucket(self, prefix): """ Get file list from s3 bucket :param prefix: s3 bucket prefix - :param marker: marker - :param is_truncated: is truncated :return: list of files """ - print(f"[bold blue]Get file list[/bold blue]: " + prefix) - params = {"prefix": prefix, "max-keys": 1000} - if marker: - params["marker"] = marker + files = [] + marker = None + is_truncated = True + MAX_DISPLAY_FILES = 5 - response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params) - tree = ElementTree.fromstring(response.content) + with Live(refresh_per_second=4) as live: + status_text = Text(f"Getting file list: {prefix}") + live.update(Panel(status_text, style="blue")) - files = [] - for content in tree.findall( - "{http://s3.amazonaws.com/doc/2006-03-01/}Contents" - ): - key = content.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text - if key.endswith(".zip"): - files.append(key) - self.marker = key + while is_truncated: + params = {"prefix": prefix, "max-keys": 1000} + if marker: + params["marker"] = marker - is_truncated_element = tree.find( - "{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated" - ) - self.is_truncated = is_truncated_element.text == "true" + response = requests.get(self._BINANCE_DATA_S3_BUCKET_URL, params=params) + tree = ElementTree.fromstring(response.content) + + for content in tree.findall( + "{http://s3.amazonaws.com/doc/2006-03-01/}Contents" + ): + key = content.find( + "{http://s3.amazonaws.com/doc/2006-03-01/}Key" + ).text + if key.endswith(".zip"): + # Filter by symbols if multiple symbols are specified + if isinstance(self._symbols, list) and len(self._symbols) > 1: + if any(symbol.upper() in key for symbol in self._symbols): + files.append(key) + marker = key + else: + files.append(key) + marker = key + + # Update display (latest files and total count) + status_text.plain = f"Getting file list: {prefix}\nTotal files found: {len(files)}" + if files: + status_text.append("\n\nLatest files:") + for recent_file in files[-MAX_DISPLAY_FILES:]: + status_text.append(f"\n{recent_file}") + live.update(Panel(status_text, style="blue")) + + is_truncated_element = tree.find( + "{http://s3.amazonaws.com/doc/2006-03-01/}IsTruncated" + ) + is_truncated = ( + is_truncated_element is not None + and is_truncated_element.text.lower() == "true" + ) - return files + status_text.plain = ( + f"File list complete: {prefix}\nTotal files found: {len(files)}" + ) + if files: + status_text.append("\n\nLatest files:") + for recent_file in files[-MAX_DISPLAY_FILES:]: + status_text.append(f"\n{recent_file}") + live.update(Panel(status_text, style="green")) + return files def _make_asset_type(self) -> str: """ @@ -244,8 +302,30 @@ def _build_prefix(self) -> str: self._timeperiod_per_file, self._data_type, ] - prefix = "/".join(url_parts) - return prefix + + # If single symbol is specified, add it to the prefix + if isinstance(self._symbols, list) and len(self._symbols) == 1: + symbol = self._symbols[0].upper() + url_parts.append(symbol) + # For trades and aggTrades, add symbol directory + if self._data_type in ["trades", "aggTrades"]: + url_parts.append(symbol) + elif isinstance(self._symbols, str): + symbol = self._symbols.upper() + url_parts.append(symbol) + # For trades and aggTrades, add symbol directory + if self._data_type in ["trades", "aggTrades"]: + url_parts.append(symbol) + + # If data frequency is required and specified, add it to the prefix + if ( + self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE + and self._data_frequency + ): + if isinstance(self._symbols, (str, list)): + url_parts.append(self._data_frequency) + + return "/".join(url_parts) def _download(self, prefix) -> None: """ @@ -253,50 +333,77 @@ def _download(self, prefix) -> None: :param prefix: s3 bucket prefix :return: None """ - self._check_params() - zip_destination_path = os.path.join(self._destination_dir, prefix) - csv_destination_path = os.path.join( - self._destination_dir, prefix.replace(".zip", ".csv") - ) + try: + self._check_params() + zip_destination_path = os.path.join(self._destination_dir, prefix) + csv_destination_path = os.path.join( + self._destination_dir, prefix.replace(".zip", ".csv") + ) - # Make directory if not exists - if not os.path.exists(os.path.dirname(zip_destination_path)): - os.makedirs(os.path.dirname(zip_destination_path)) + # Make directory if not exists + if not os.path.exists(os.path.dirname(zip_destination_path)): + try: + os.makedirs(os.path.dirname(zip_destination_path)) + except (PermissionError, OSError) as e: + raise BinanceBulkDownloaderDownloadError( + f"Directory creation error: {str(e)}" + ) - # Don't download if already exists - if os.path.exists(csv_destination_path): - print(f"[yellow]Already exists: {csv_destination_path}[/yellow]") - return + # Don't download if already exists + if os.path.exists(csv_destination_path): + return - url = f"{self._BINANCE_DATA_DOWNLOAD_BASE_URL}/{prefix}" - print(f"[bold blue]Downloading {url}[/bold blue]") - try: - response = requests.get(url, zip_destination_path) - print(f"[green]Downloaded: {url}[/green]") - except requests.exceptions.HTTPError: - print(f"[red]HTTP Error: {url}[/red]") - return None + url = f"{self._BINANCE_DATA_DOWNLOAD_BASE_URL}/{prefix}" - with open(zip_destination_path, "wb") as file: - for chunk in response.iter_content(chunk_size=8192): - file.write(chunk) + try: + response = requests.get(url) + response.raise_for_status() + except ( + requests.exceptions.RequestException, + requests.exceptions.HTTPError, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + ) as e: + raise BinanceBulkDownloaderDownloadError(f"Download error: {str(e)}") - try: - unzipped_path = "/".join(zip_destination_path.split("/")[:-1]) - with zipfile.ZipFile(zip_destination_path) as existing_zip: - existing_zip.extractall( - csv_destination_path.replace(csv_destination_path, unzipped_path) + try: + with open(zip_destination_path, "wb") as file: + for chunk in response.iter_content(chunk_size=8192): + file.write(chunk) + except OSError as e: + raise BinanceBulkDownloaderDownloadError(f"File write error: {str(e)}") + + try: + unzipped_path = "/".join(zip_destination_path.split("/")[:-1]) + with zipfile.ZipFile(zip_destination_path) as existing_zip: + existing_zip.extractall( + csv_destination_path.replace( + csv_destination_path, unzipped_path + ) + ) + except BadZipfile as e: + if os.path.exists(zip_destination_path): + os.remove(zip_destination_path) + raise BinanceBulkDownloaderDownloadError( + f"Bad Zip File: {zip_destination_path}" ) - print(f"[green]Unzipped: {zip_destination_path}[/green]") - except BadZipfile: - print(f"[red]Bad Zip File: {zip_destination_path}[/red]") - os.remove(zip_destination_path) - print(f"[green]Removed: {zip_destination_path}[/green]") - raise BinanceBulkDownloaderDownloadError + except OSError as e: + if os.path.exists(zip_destination_path): + os.remove(zip_destination_path) + raise BinanceBulkDownloaderDownloadError(f"Unzip error: {str(e)}") - # Delete zip file - os.remove(zip_destination_path) - print(f"[green]Removed: {zip_destination_path}[/green]") + # Delete zip file + try: + os.remove(zip_destination_path) + except OSError as e: + raise BinanceBulkDownloaderDownloadError( + f"File removal error: {str(e)}" + ) + + except Exception as e: + if not isinstance(e, BinanceBulkDownloaderDownloadError): + raise BinanceBulkDownloaderDownloadError(f"Unexpected error: {str(e)}") + raise @staticmethod def make_chunks(lst, n) -> list: @@ -313,22 +420,58 @@ def run_download(self): Download concurrently :return: None """ - print(f"[bold blue]Downloading {self._data_type}[/bold blue]") + self.console.print( + Panel(f"Starting download for {self._data_type}", style="blue bold") + ) - while self.is_truncated: - file_list_generator = self._get_file_list_from_s3_bucket( - self._build_prefix(), self.marker, self.is_truncated - ) - if self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE: - file_list_generator = [ - prefix - for prefix in file_list_generator - if prefix.count(self._data_frequency) == 2 - ] - for prefix_chunk in track( - self.make_chunks(file_list_generator, self._CHUNK_SIZE), - description="Downloading", - ): + file_list = [] + # Handle multiple symbols by getting each symbol's files separately + if isinstance(self._symbols, list) and len(self._symbols) > 1: + original_symbols = self._symbols + for symbol in original_symbols: + self._symbols = symbol # Temporarily set to single symbol + symbol_files = self._get_file_list_from_s3_bucket(self._build_prefix()) + file_list.extend(symbol_files) + self._symbols = original_symbols # Restore original symbols + else: + file_list = self._get_file_list_from_s3_bucket(self._build_prefix()) + + # Filter by data frequency only if not already filtered by prefix + if ( + self._data_type in self._DATA_FREQUENCY_REQUIRED_BY_DATA_TYPE + and not isinstance(self._symbols, (str, list)) + ): + file_list = [ + prefix + for prefix in file_list + if prefix.count(self._data_frequency) == 2 + ] + + # Create progress display + with Live(refresh_per_second=4) as live: + status = Text() + chunks = self.make_chunks(file_list, self._CHUNK_SIZE) + total_chunks = len(chunks) + + # Download files in chunks + for chunk_index, prefix_chunk in enumerate(chunks, 1): with ThreadPoolExecutor() as executor: - executor.map(self._download, prefix_chunk) + futures = [] + for prefix in prefix_chunk: + future = executor.submit(self._download, prefix) + futures.append((future, prefix)) + + # Update status as files complete + for future, prefix in futures: + try: + future.result() + progress = ( + (len(self.downloaded_list) + 1) / len(file_list) * 100 + ) + status.plain = f"[{chunk_index}/{total_chunks}] Progress: {progress:.1f}% | Latest: {os.path.basename(prefix)}" + live.update(status) + except Exception as e: + status.plain = f"Error: {str(e)}" + live.update(status) + self.downloaded_list.extend(prefix_chunk) diff --git a/example/download_spot_symbols.py b/example/download_spot_symbols.py new file mode 100644 index 0000000..fa86d7e --- /dev/null +++ b/example/download_spot_symbols.py @@ -0,0 +1,33 @@ +""" +Download spot market data for specific symbols +""" + +from binance_bulk_downloader.downloader import BinanceBulkDownloader + +# Download single symbol (BTCUSDT) from spot market +downloader = BinanceBulkDownloader( + data_type="klines", + data_frequency="1h", + asset="spot", + timeperiod_per_file="daily", + symbols="BTCUSDT", +) +downloader.run_download() + +# Download multiple symbols (BTCUSDT and ETHUSDT) from spot market +downloader = BinanceBulkDownloader( + data_type="trades", + asset="spot", + timeperiod_per_file="daily", + symbols=["BTCUSDT", "ETHUSDT"], +) +downloader.run_download() + +# Download aggTrades for multiple symbols +downloader = BinanceBulkDownloader( + data_type="aggTrades", + asset="spot", + timeperiod_per_file="daily", + symbols=["BTCUSDT", "ETHUSDT"], +) +downloader.run_download() diff --git a/requirements.txt b/requirements.txt index 27d858b..5d1ff91 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -requests~=2.28.2 -setuptools~=68.1.2 +requests~=2.32.0 +setuptools~=70.0.0 rich~=10.16.2 pytest~=4.6.11 \ No newline at end of file diff --git a/setup.py b/setup.py index c498a29..250a18a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="binance-bulk-downloader", - version="1.0.5", + version="1.1.0", description="A Python library to efficiently and concurrently download historical data files from Binance. Supports all asset types (spot, futures, options) and all frequencies.", install_requires=["requests", "rich", "pytest"], author="aoki-h-jp", diff --git a/tests/test_error_cases.py b/tests/test_error_cases.py new file mode 100644 index 0000000..579d33c --- /dev/null +++ b/tests/test_error_cases.py @@ -0,0 +1,135 @@ +""" +Test error cases for BinanceBulkDownloader +""" + +import os +import pytest +import requests +from unittest.mock import patch, MagicMock +from zipfile import BadZipfile + +from binance_bulk_downloader.downloader import BinanceBulkDownloader +from binance_bulk_downloader.exceptions import ( + BinanceBulkDownloaderDownloadError, + BinanceBulkDownloaderParamsError, +) + + +class TestBinanceBulkDownloaderErrors: + @pytest.fixture + def downloader(self): + return BinanceBulkDownloader() + + def test_invalid_data_type(self, downloader): + """Test case for invalid data type""" + downloader._data_type = "invalid_type" + with pytest.raises(BinanceBulkDownloaderParamsError) as exc_info: + downloader._check_params() + assert "data_type must be" in str(exc_info.value) + + def test_invalid_asset(self, downloader): + """Test case for invalid asset type""" + downloader._asset = "invalid_asset" + with pytest.raises(BinanceBulkDownloaderParamsError) as exc_info: + downloader._check_params() + assert "asset must be" in str(exc_info.value) + + def test_invalid_timeperiod(self, downloader): + """Test case for invalid time period""" + downloader._timeperiod_per_file = "invalid_period" + with pytest.raises(BinanceBulkDownloaderParamsError) as exc_info: + downloader._check_params() + assert "timeperiod_per_file must be daily or monthly" in str(exc_info.value) + + def test_invalid_data_frequency(self, downloader): + """Test case for invalid data frequency""" + downloader._data_frequency = "invalid_frequency" + with pytest.raises(BinanceBulkDownloaderParamsError) as exc_info: + downloader._check_params() + assert "data_frequency must be" in str(exc_info.value) + + def test_1s_frequency_non_spot(self, downloader): + """Test case for using 1s frequency with non-spot asset""" + downloader._data_frequency = "1s" + downloader._asset = "um" + with pytest.raises(BinanceBulkDownloaderParamsError) as exc_info: + downloader._check_params() + assert "data_frequency 1s is not supported" in str(exc_info.value) + + @patch("requests.get") + @patch("os.path.exists") + @patch("os.makedirs") + def test_network_error(self, mock_makedirs, mock_exists, mock_get, downloader): + """Test case for HTTP network error""" + mock_exists.return_value = False + mock_makedirs.return_value = None + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError() + mock_get.return_value = mock_response + with pytest.raises(BinanceBulkDownloaderDownloadError): + downloader._download("test/prefix/file.zip") + + @patch("requests.get") + @patch("os.path.exists") + @patch("os.makedirs") + def test_connection_timeout(self, mock_makedirs, mock_exists, mock_get, downloader): + """Test case for connection timeout""" + mock_exists.return_value = False + mock_makedirs.return_value = None + mock_get.side_effect = requests.exceptions.Timeout() + with pytest.raises(BinanceBulkDownloaderDownloadError): + downloader._download("test/prefix/file.zip") + + @patch("requests.get") + @patch("os.path.exists") + @patch("os.makedirs") + def test_connection_error(self, mock_makedirs, mock_exists, mock_get, downloader): + """Test case for connection error""" + mock_exists.return_value = False + mock_makedirs.return_value = None + mock_get.side_effect = requests.exceptions.ConnectionError() + with pytest.raises(BinanceBulkDownloaderDownloadError): + downloader._download("test/prefix/file.zip") + + @patch("requests.get") + @patch("os.path.exists") + @patch("os.makedirs") + @patch("zipfile.ZipFile") + def test_bad_zip_file( + self, mock_zipfile, mock_makedirs, mock_exists, mock_get, downloader + ): + """Test case for corrupted ZIP file""" + mock_exists.return_value = False + mock_makedirs.return_value = None + mock_response = MagicMock() + mock_response.iter_content.return_value = [b"dummy content"] + mock_get.return_value = mock_response + mock_zipfile.side_effect = BadZipfile() + with pytest.raises(BinanceBulkDownloaderDownloadError): + downloader._download("test/prefix/file.zip") + + @patch("requests.get") + @patch("os.path.exists") + @patch("os.makedirs") + def test_permission_error(self, mock_makedirs, mock_exists, mock_get, downloader): + """Test case for directory permission error""" + mock_exists.return_value = False + mock_makedirs.side_effect = PermissionError() + with pytest.raises(BinanceBulkDownloaderDownloadError): + downloader._download("test/prefix/file.zip") + + @patch("requests.get") + @patch("os.path.exists") + @patch("os.makedirs") + def test_disk_space_error(self, mock_makedirs, mock_exists, mock_get, downloader): + """Test case for insufficient disk space""" + mock_exists.return_value = False + mock_makedirs.return_value = None + mock_response = MagicMock() + mock_response.iter_content.return_value = [b"dummy content"] + mock_get.return_value = mock_response + + m = patch("builtins.open", side_effect=OSError(28, "No space left on device")) + with m: + with pytest.raises(BinanceBulkDownloaderDownloadError): + downloader._download("test/prefix/file.zip") diff --git a/tests/test_spot_symbols.py b/tests/test_spot_symbols.py new file mode 100644 index 0000000..a2fa299 --- /dev/null +++ b/tests/test_spot_symbols.py @@ -0,0 +1,146 @@ +""" +Test spot market symbols filtering +""" + +import pytest +from unittest.mock import patch + +from binance_bulk_downloader.downloader import BinanceBulkDownloader + + +@pytest.fixture +def mock_s3_response(): + """Mock S3 response""" + return { + "BTCUSDT": [ + "data/spot/daily/klines/BTCUSDT/1h/BTCUSDT-1h-2024-01-01.zip", + "data/spot/daily/klines/BTCUSDT/1h/BTCUSDT-1h-2024-01-02.zip", + ], + "ETHUSDT": [ + "data/spot/daily/klines/ETHUSDT/1h/ETHUSDT-1h-2024-01-01.zip", + "data/spot/daily/klines/ETHUSDT/1h/ETHUSDT-1h-2024-01-02.zip", + ], + } + + +def dynamic_spot_symbols_test_params(): + """ + Generate params for spot symbols tests + :return: test parameters + """ + test_cases = [ + # Single symbol klines + ("klines", "1h", "daily", "BTCUSDT", True), + # Multiple symbols klines + ("klines", "1h", "daily", ["BTCUSDT", "ETHUSDT"], True), + # Multiple symbols trades + ("trades", None, "daily", ["BTCUSDT", "ETHUSDT"], True), + # Multiple symbols aggTrades + ("aggTrades", None, "daily", ["BTCUSDT", "ETHUSDT"], True), + # Invalid symbol + ("klines", "1h", "daily", "INVALID_SYMBOL", False), + # Empty symbols list (no filtering) + ("klines", "1h", "daily", [], True), + ] + + for ( + data_type, + data_frequency, + timeperiod_per_file, + symbols, + should_pass, + ) in test_cases: + yield pytest.param( + data_type, + data_frequency, + timeperiod_per_file, + symbols, + should_pass, + id=f"{data_type}-{symbols}-{should_pass}", + ) + + +@pytest.mark.parametrize( + "data_type, data_frequency, timeperiod_per_file, symbols, should_pass", + dynamic_spot_symbols_test_params(), +) +def test_spot_symbols( + mock_s3_response, + tmpdir, + data_type, + data_frequency, + timeperiod_per_file, + symbols, + should_pass, +): + """ + Test spot market symbols filtering + :param mock_s3_response: mock S3 response + :param tmpdir: temporary directory + :param data_type: type of data to download + :param data_frequency: frequency of data + :param timeperiod_per_file: time period per file + :param symbols: symbol or list of symbols + :param should_pass: whether the test should pass validation + """ + params = { + "destination_dir": tmpdir, + "data_type": data_type, + "asset": "spot", + "timeperiod_per_file": timeperiod_per_file, + "symbols": symbols, + } + if data_frequency: + params["data_frequency"] = data_frequency + + downloader = BinanceBulkDownloader(**params) + downloader._check_params() + + # Build prefix + prefix = downloader._build_prefix() + assert isinstance(prefix, str), "Prefix should be a string" + assert prefix.startswith("data/spot"), "Prefix should start with data/spot" + + # Mock file list + def mock_get_file_list(self, prefix): + if isinstance(symbols, str): + return mock_s3_response.get(symbols, []) + elif not symbols: + # Empty symbols list means no filtering + all_files = [] + for files in mock_s3_response.values(): + all_files.extend(files) + return all_files + else: + # Multiple symbols means combine files for specified symbols + files = [] + for symbol in symbols: + files.extend(mock_s3_response.get(symbol, [])) + return files + + # Mock _get_file_list_from_s3_bucket + with patch.object( + BinanceBulkDownloader, "_get_file_list_from_s3_bucket", mock_get_file_list + ): + file_list = downloader._get_file_list_from_s3_bucket(prefix) + + if not should_pass: + assert ( + len(file_list) == 0 + ), f"File list should be empty for invalid symbol {symbols}" + return + + if isinstance(symbols, str): + symbol_list = [symbols] + elif not symbols: + # Empty symbols list means no filtering + assert len(file_list) > 0, "File list should not be empty for no filtering" + return + else: + symbol_list = symbols + + # Check if each file in the file list contains one of the specified symbols + for file in file_list: + assert any( + symbol in file for symbol in symbol_list + ), f"File {file} should contain one of the symbols {symbol_list}"