Skip to content

Commit

Permalink
Fix get last portfolio holdings bug (#476)
Browse files Browse the repository at this point in the history
* fix: getting last_portfolio if project_Id is None

* feat: several logging and patches

* fix: (part) cloud status of project command

* First attempt to solve the bug

* Add fix

* address requested changes

* Fix bug

* Fix bugs

* Fix bug #468

* Address requested changes

* fix bug in cloud_last_time

* Fix bugs
- Since you must always choose a data feed when deploying a local live algo, data_provider_live will never be zero.
- Since the date used to determine the last local deployment (in live_utils.py get_last_portfolio) comes from the output directory, we cannot create the output directory before calling get_last_portfolio because then we would get always the local holdings in the get_last_portfolio_method
- The output directory for the previous state file (see _get_last_portfolio) comes with an "L-" prefix

* Address remarks

* Nit change

---------

Co-authored-by: Romazes <[email protected]>
  • Loading branch information
Marinovsky and Romazes committed Jul 16, 2024
1 parent 982d259 commit ac5f26a
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 54 deletions.
6 changes: 1 addition & 5 deletions lean/commands/cloud/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def status(project: str) -> None:
cloud_project_manager = container.cloud_project_manager
cloud_project = cloud_project_manager.get_cloud_project(project, False)

live_algorithm = next((d for d in api_client.live.get_all() if d.projectId == cloud_project.projectId), None)
live_algorithm = api_client.live.get_project_by_id(cloud_project.projectId)

logger.info(f"Project id: {cloud_project.projectId}")
logger.info(f"Project name: {cloud_project.name}")
Expand Down Expand Up @@ -59,7 +59,3 @@ def status(project: str) -> None:

if live_algorithm.stopped is not None:
logger.info(f"Stopped: {live_algorithm.stopped.strftime('%Y-%m-%d %H:%M:%S')} UTC")

if live_algorithm.error != "":
logger.info("Error:")
logger.info(live_algorithm.error)
11 changes: 7 additions & 4 deletions lean/commands/live/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,19 @@ def deploy(project: Path,

_start_iqconnect_if_necessary(lean_config, environment_name)

if not output.exists():
output.mkdir(parents=True)

if python_venv is not None and python_venv != "":
lean_config["python-venv"] = f'{"/" if python_venv[0] != "/" else ""}{python_venv}'

cash_balance_option, holdings_option, last_cash, last_holdings = get_last_portfolio_cash_holdings(container.api_client, brokerage_instance,
project_config.get("cloud-id", None), project)

if environment is None and brokerage is None and len(data_provider_live) == 0: # condition for using interactive panel
# We cannot create the output directory before calling get_last_portfolio_holdings, since then the most recently
# deployment would be always the local one (it has the current time in its name), and we would never be able to
# use the cash and holdings from a cloud deployment (see live_utils._get_last_portfolio() method)
if not output.exists():
output.mkdir(parents=True)

if environment is None and brokerage is None: # condition for using interactive panel
if cash_balance_option != LiveInitialStateInput.NotSupported:
live_cash_balance = _configure_initial_cash_interactively(logger, cash_balance_option, last_cash)

Expand Down
28 changes: 10 additions & 18 deletions lean/components/api/live_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,21 @@ def __init__(self, api_client: 'APIClient') -> None:
"""
self._api = api_client

def get_all(self,
status: Optional[QCLiveAlgorithmStatus] = None,
# Values less than 86400 cause errors on Windows: https://bugs.python.org/issue37527
start: datetime = datetime.fromtimestamp(86400),
end: datetime = datetime.now()) -> List[QCFullLiveAlgorithm]:
def get_project_by_id(self,
project_id: str) -> QCFullLiveAlgorithm:
"""Retrieves all live algorithms.
:param status: the status to filter by or None if no status filter should be applied
:param start: the earliest launch time the returned algorithms should have
:param end: the latest launch time the returned algorithms should have
:return: a list of live algorithms which match the given filters
:param project_id: the project id
:return: a live algorithm which match the given filters
"""
from math import floor
parameters = {
"start": floor(start.timestamp()),
"end": floor(end.timestamp())
}
parameters = {"projectId": project_id}
response = self._api.get("live/read", parameters)

if status is not None:
parameters["status"] = status.value
if response:
response["projectId"] = project_id
return QCFullLiveAlgorithm(**response)

data = self._api.get("live/read", parameters)
return [QCFullLiveAlgorithm(**algorithm) for algorithm in data["live"]]
return None

def start(self,
project_id: int,
Expand Down
71 changes: 55 additions & 16 deletions lean/components/util/live_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,41 @@
from lean.components.api.api_client import APIClient
from lean.components.util.logger import Logger
from lean.models.json_module import LiveInitialStateInput, JsonModule
from collections import UserDict


class InsensitiveCaseDict(UserDict):
def __getitem__(self, key: Any) -> Any:
if type(key) is str:
return super().__getitem__(key.lower())
return super().__getitem__(key)

def __setitem__(self, key: Any, item: Any) -> Any:
if type(key) is str:
self.data[key.lower()] = item
return
self.data[key] = item


def _get_last_portfolio(api_client: APIClient, project_id: str, project_name: Path) -> List[Dict[str, Any]]:
from pytz import utc, UTC
from os import listdir, path
from json import loads
from datetime import datetime

cloud_deployment_list = api_client.get("live/read")
cloud_deployment_time = [datetime.strptime(instance["launched"], "%Y-%m-%d %H:%M:%S").astimezone(UTC) for instance in cloud_deployment_list["live"]
if instance["projectId"] == project_id]
cloud_last_time = sorted(cloud_deployment_time, reverse = True)[0] if cloud_deployment_time else utc.localize(datetime.min)
cloud_last_time = utc.localize(datetime.min)
if project_id:
cloud_deployment = api_client.get("live/read", {"projectId": project_id})
if cloud_deployment["success"] and cloud_deployment["status"] != "Undefined":
if cloud_deployment["stopped"] is not None:
cloud_last_time = datetime.strptime(cloud_deployment["stopped"], "%Y-%m-%d %H:%M:%S")
else:
cloud_last_time = datetime.strptime(cloud_deployment["launched"], "%Y-%m-%d %H:%M:%S")
cloud_last_time = datetime(cloud_last_time.year, cloud_last_time.month,
cloud_last_time.day, cloud_last_time.hour,
cloud_last_time.minute,
cloud_last_time.second,
tzinfo=UTC)

local_last_time = utc.localize(datetime.min)
live_deployment_path = f"{project_name}/live"
Expand All @@ -38,14 +62,14 @@ def _get_last_portfolio(api_client: APIClient, project_id: str, project_name: Pa
local_last_time = sorted(local_deployment_time, reverse = True)[0]

if cloud_last_time > local_last_time:
last_state = api_client.get("live/read/portfolio", {"projectId": project_id})
last_state = api_client.get("live/portfolio/read", {"projectId": project_id})
previous_portfolio_state = last_state["portfolio"]
elif cloud_last_time < local_last_time:
from lean.container import container
output_directory = container.output_config_manager.get_latest_output_directory("live")
if not output_directory:
return None
previous_state_file = get_latest_result_json_file(output_directory)
previous_state_file = get_latest_result_json_file(output_directory, True)
if not previous_state_file:
return None
previous_portfolio_state = {x.lower(): y for x, y in loads(open(previous_state_file, "r", encoding="utf-8").read()).items()}
Expand All @@ -64,18 +88,29 @@ def get_last_portfolio_cash_holdings(api_client: APIClient, brokerage_instance:
:param project: the name of the project
:return: the options of initial cash/holdings setting, and the latest portfolio cash/holdings from the last deployment
"""
last_cash = []
last_holdings = []
from lean.container import container
last_cash = {}
last_holdings = {}
container.logger.debug(f'brokerage_instance: {brokerage_instance}')
cash_balance_option = brokerage_instance._initial_cash_balance
holdings_option = brokerage_instance._initial_holdings
container.logger.debug(f'cash_balance_option: {cash_balance_option}')
container.logger.debug(f'holdings_option: {holdings_option}')
if cash_balance_option != LiveInitialStateInput.NotSupported or holdings_option != LiveInitialStateInput.NotSupported:
last_portfolio = _get_last_portfolio(api_client, project_id, project)
last_cash = last_portfolio["cash"] if last_portfolio else None
last_holdings = last_portfolio["holdings"] if last_portfolio else None
if last_portfolio is not None:
for key, value in last_portfolio["cash"].items():
last_cash[key] = InsensitiveCaseDict(value)
for key, value in last_portfolio["holdings"].items():
last_holdings[key] = InsensitiveCaseDict(value)
last_holdings[key]["symbol"] = InsensitiveCaseDict(last_holdings[key]["symbol"])
else:
last_cash = None
last_holdings = None
return cash_balance_option, holdings_option, last_cash, last_holdings


def _configure_initial_cash_interactively(logger: Logger, cash_input_option: LiveInitialStateInput, previous_cash_state: List[Dict[str, Any]]) -> List[Dict[str, float]]:
def _configure_initial_cash_interactively(logger: Logger, cash_input_option: LiveInitialStateInput, previous_cash_state: Dict[str, Any]) -> List[Dict[str, float]]:
cash_list = []
previous_cash_balance = []
if previous_cash_state:
Expand Down Expand Up @@ -104,7 +139,7 @@ def _configure_initial_cash_interactively(logger: Logger, cash_input_option: Liv
return []


def configure_initial_cash_balance(logger: Logger, cash_input_option: LiveInitialStateInput, live_cash_balance: str, previous_cash_state: List[Dict[str, Any]])\
def configure_initial_cash_balance(logger: Logger, cash_input_option: LiveInitialStateInput, live_cash_balance: str, previous_cash_state: Dict[str, Any])\
-> List[Dict[str, float]]:
"""Interactively configures the intial cash balance.
Expand All @@ -124,7 +159,7 @@ def configure_initial_cash_balance(logger: Logger, cash_input_option: LiveInitia
return _configure_initial_cash_interactively(logger, cash_input_option, previous_cash_state)


def _configure_initial_holdings_interactively(logger: Logger, holdings_option: LiveInitialStateInput, previous_holdings: List[Dict[str, Any]]) -> List[Dict[str, float]]:
def _configure_initial_holdings_interactively(logger: Logger, holdings_option: LiveInitialStateInput, previous_holdings: Dict[str, Any]) -> List[Dict[str, float]]:
holdings = []
last_holdings = []
if previous_holdings:
Expand Down Expand Up @@ -156,7 +191,7 @@ def _configure_initial_holdings_interactively(logger: Logger, holdings_option: L
return []


def configure_initial_holdings(logger: Logger, holdings_option: LiveInitialStateInput, live_holdings: str, previous_holdings: List[Dict[str, Any]])\
def configure_initial_holdings(logger: Logger, holdings_option: LiveInitialStateInput, live_holdings: str, previous_holdings: Dict[str, Any])\
-> List[Dict[str, float]]:
"""Interactively configures the intial portfolio holdings.
Expand All @@ -176,7 +211,7 @@ def configure_initial_holdings(logger: Logger, holdings_option: LiveInitialState
return _configure_initial_holdings_interactively(logger, holdings_option, previous_holdings)


def get_latest_result_json_file(output_directory: Path) -> Optional[Path]:
def get_latest_result_json_file(output_directory: Path, is_live_trading: bool = False) -> Optional[Path]:
from lean.container import container

output_config_manager = container.output_config_manager
Expand All @@ -185,7 +220,11 @@ def get_latest_result_json_file(output_directory: Path) -> Optional[Path]:
if output_id is None:
return None

result_file = output_directory / f"{output_id}.json"
prefix = ""
if is_live_trading:
prefix = "L-"

result_file = output_directory / f"{prefix}{output_id}.json"
if not result_file.exists():
return None

Expand Down
4 changes: 1 addition & 3 deletions lean/models/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class QCMinimalLiveAlgorithm(WrappedBaseModel):
def get_url(self) -> str:
"""Returns the url of the live deployment in the cloud.
:return: a url which when visited opens an Algorithm Lab tab containing the live deployment
:return: an url which when visited opens an Algorithm Lab tab containing the live deployment
"""
return f"https://www.quantconnect.com/project/{self.projectId}/live"

Expand All @@ -291,8 +291,6 @@ class QCFullLiveAlgorithm(QCMinimalLiveAlgorithm):
launched: datetime
stopped: Optional[datetime]
brokerage: str
subscription: str
error: str


class QCEmailNotificationMethod(WrappedBaseModel):
Expand Down
30 changes: 25 additions & 5 deletions tests/commands/cloud/live/test_cloud_live_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ def test_cloud_live_deploy() -> None:

api_client = mock.Mock()
api_client.nodes.get_all.return_value = create_qc_nodes()
api_client.get.return_value = {'portfolio': {"cash": {}}, 'live': []}
api_client.get.return_value = {
"status": "stopped",
"stopped": "2024-07-10 19:12:20",
"success": True,
"portfolio": {"holdings": {}, "cash": {}, "success": True}}
container.api_client = api_client

cloud_project_manager = mock.Mock()
Expand Down Expand Up @@ -98,7 +102,11 @@ def test_cloud_live_deploy_with_ib_using_hybrid_datafeed() -> None:

api_client = mock.Mock()
api_client.nodes.get_all.return_value = create_qc_nodes()
api_client.get.return_value = {'portfolio': {"cash": {}}, 'live': []}
api_client.get.return_value = {
"status": "stopped",
"stopped": "2024-07-10 19:12:20",
"success": True,
"portfolio": {"holdings": {}, "cash": {}, "success": True}}
container.api_client = api_client

cloud_project_manager = mock.Mock()
Expand Down Expand Up @@ -155,7 +163,11 @@ def test_cloud_live_deploy_with_notifications(notice_method: str, configs: str)

api_client = mock.Mock()
api_client.nodes.get_all.return_value = create_qc_nodes()
api_client.get.return_value = {'portfolio': {"cash": {}}, 'live': []}
api_client.get.return_value = {
"status": "stopped",
"stopped": "2024-07-10 19:12:20",
"success": True,
"portfolio": {"holdings": {}, "cash": {}, "success": True}}
container.api_client = api_client

cloud_project_manager = mock.Mock()
Expand Down Expand Up @@ -238,7 +250,11 @@ def test_cloud_live_deploy_with_live_cash_balance(brokerage: str, cash: str) ->

api_client = mock.Mock()
api_client.nodes.get_all.return_value = create_qc_nodes()
api_client.get.return_value = {'live': [], 'portfolio': {}}
api_client.get.return_value = {
"status": "stopped",
"stopped": "2024-07-10 19:12:20",
"success": True,
"portfolio": {"cash": {}, "holdings": {}}}
container.api_client = api_client

cloud_runner = mock.Mock()
Expand Down Expand Up @@ -315,7 +331,11 @@ def test_cloud_live_deploy_with_live_holdings(brokerage: str, holdings: str) ->

api_client = mock.Mock()
api_client.nodes.get_all.return_value = create_qc_nodes()
api_client.get.return_value = {'live': [], 'portfolio': {}}
api_client.get.return_value = {
"status": "stopped",
"stopped": "2024-07-10 19:12:20",
"success": True,
"portfolio": {"cash": {}, "holdings": {}}}
container.api_client = api_client

cloud_runner = mock.Mock()
Expand Down
6 changes: 3 additions & 3 deletions tests/commands/test_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,14 +1182,14 @@ def test_live_deploy_with_different_brokerage_and_different_live_data_provider_a

is_exists = []
if brokerage_product_id is None and data_provider_historical_name != "Local":
assert len(api_client.method_calls) == 3
assert len(api_client.method_calls) == 2
for m_c, id in zip(api_client.method_calls, [data_provider_live_product_id, data_provider_historical_id]):
if id in m_c[1]:
is_exists.append(True)
assert is_exists
assert len(is_exists) == 2
elif brokerage_product_id is None and data_provider_historical_name == "Local":
assert len(api_client.method_calls) == 2
assert len(api_client.method_calls) == 1
if data_provider_live_product_id in api_client.method_calls[0][1]:
is_exists.append(True)
assert is_exists
Expand Down Expand Up @@ -1243,7 +1243,7 @@ def test_live_non_interactive_deploy_paper_brokerage_different_live_data_provide
api_client = mock.MagicMock()
create_lean_option(brokerage_name, data_provider_live_name, None, api_client)

assert len(api_client.method_calls) == 2
assert len(api_client.method_calls) == 1
for m_c in api_client.method_calls:
if data_provider_live_product_id in m_c[1]:
is_exist = True
Expand Down

0 comments on commit ac5f26a

Please sign in to comment.