Skip to content

Upgrade subsystem overhaul #540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions panos/__init__.py
Original file line number Diff line number Diff line change
@@ -127,6 +127,12 @@ def isstring(arg):
pan.DEBUG3 = pan.DEBUG2 - 1


def stringToVersion(other):
if isstring(other):
other = PanOSVersion(other)
return other


class PanOSVersion(LooseVersion):
"""LooseVersion with convenience properties to access version components"""

@@ -150,6 +156,10 @@ def patch(self):
def mainrelease(self):
return self.version[0:3]

@property
def minorrelease(self):
return self.version[0:2]

@property
def subrelease(self):
try:
@@ -174,6 +184,19 @@ def subrelease_num(self):
subrelease_num = None
return subrelease_num

@property
def baseimage(self):
# Account for lack of PAN-OS 7.0.0
if self.major == 7 and self.minor == 0:
base_patch = "1"
else:
base_patch = "0"
version_string = str(self)
version_tokens = version_string.split("-")[0].split(".")
version_tokens[2] = base_patch
base_image_string = ".".join(version_tokens)
return PanOSVersion(base_image_string)

def __repr__(self):
return "PanOSVersion ('%s')" % str(self)

@@ -221,12 +244,6 @@ def __ne__(self, other):
return not self.__eq__(other)


def stringToVersion(other):
if isstring(other):
other = PanOSVersion(other)
return other


def tree_legend_dot():
"""Create a graphviz dot string for a legend graph"""
modules = ["firewall", "policies", "objects", "network", "device", "panorama", "ha"]
221 changes: 161 additions & 60 deletions panos/updater.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@

"""Device updater handles software versions and updates for devices"""

from typing import Any, List, Literal, Optional, Union, cast

from pan.config import PanConfig

import panos.errors as err
@@ -151,7 +153,12 @@ def _parse_current_version(self, response_element):
self._logger.debug("Found current version: %s" % current_version)
return current_version

def download_install(self, version, load_config=None, sync=False):
def download_install(
self,
version: Union[str, PanOSVersion],
load_config: Optional[str] = None,
sync: bool = False,
) -> Any:
"""Download and install the requested PAN-OS version.
Like a combinations of the ``check()``, ``download()``, and
@@ -173,13 +180,11 @@ def download_install(self, version, load_config=None, sync=False):
If sync, returns result of PAN-OS install job
"""
if isstring(version):
version = PanOSVersion(version)
# Get list of software if needed
if not self.versions:
self.check()
# Get versions as StrictVersion objects
available_versions = map(PanOSVersion, self.versions.keys())
available_versions = list(map(PanOSVersion, self.versions.keys()))
target_version = PanOSVersion(str(version))
current_version = PanOSVersion(self.pandevice.version)

@@ -202,7 +207,12 @@ def download_install(self, version, load_config=None, sync=False):
result = self.install(target_version, load_config=load_config, sync=sync)
return result

def download_install_reboot(self, version, load_config=None, sync=False):
def download_install_reboot(
self,
version: Union[str, PanOSVersion],
load_config: Optional[str] = None,
sync: bool = False,
) -> Optional[str]:
"""Download and install the requested PAN-OS version, then reboot.
Like a combinations of the ``check()``, ``download()``, and
@@ -219,17 +229,16 @@ def download_install_reboot(self, version, load_config=None, sync=False):
err.PanDeviceError: problem found in pre-download checks or after reboot
"""
if isstring(version):
version = PanOSVersion(version)
self.download_install(version, load_config, sync=True)
target_version = PanOSVersion(str(version))
self.download_install(target_version, load_config, sync=True)
# Reboot the device
self._logger.info(
"Device %s is rebooting after upgrading to version %s. This will take a while."
"Device %s is rebooting after upgrading to version %s. This will take a while."
% (self.pandevice.id, version)
)
self.pandevice.restart()
if sync:
new_version = self.pandevice.syncreboot()
new_version: str = self.pandevice.syncreboot()
if version != new_version:
raise err.PanDeviceError(
"Attempt to upgrade to version %s failed."
@@ -241,14 +250,82 @@ def download_install_reboot(self, version, load_config=None, sync=False):
else:
return None

def upgrade_to_version(self, target_version, dryrun=False):
def _next_upgrade_version(
self,
target_version: Union[PanOSVersion, Literal["latest"]],
install_base: bool,
) -> PanOSVersion:
current_version = PanOSVersion(self.pandevice.version)
if target_version != "latest" and current_version == target_version:
return None
available_versions = list(map(PanOSVersion, self.versions.keys()))
latest_version = max(available_versions)
next_minor_version = self._next_minor_version(current_version)
if next_minor_version not in available_versions:
next_minor_version = None
if install_base:
if target_version == "latest":
return (
next_minor_version
if next_minor_version is not None
else latest_version
)
elif self._direct_upgrade_possible(
current_version, target_version, install_base
):
# No minor upgrade needed to target
return target_version
elif next_minor_version is None:
return latest_version
else:
return next_minor_version
else:
if target_version == "latest":
if next_minor_version is None:
return latest_version
else:
return self._latest_patch_version(
next_minor_version, available_versions
)
elif self._direct_upgrade_possible(
current_version, target_version, install_base
):
return target_version
else:
# More than one minor upgrade needed to target
return self._latest_patch_version(
next_minor_version, available_versions
)

def _current_version_is_target(
self, target_version: Union[PanOSVersion, Literal["latest"]]
) -> bool:
current_version = PanOSVersion(self.pandevice.version)
available_versions = list(map(PanOSVersion, self.versions.keys()))
latest_version = max(available_versions)
if target_version == "latest" and current_version == latest_version:
return True
elif current_version == target_version:
return True
else:
return False

def upgrade_to_version(
self,
target_version: Union[str, PanOSVersion],
dryrun: bool = False,
install_base: bool = True,
) -> List[str]:
"""Upgrade to the target version, completing all intermediate upgrades.
For example, if firewall is running version 9.0.5 and target version is 10.0.2,
then this method will proceed through the following steps:
- Download 9.1.0
- Upgrade to 9.1.0 and reboot
- Upgrade to 10.0.0 and reboot
- Download 10.0.0
- Upgrade to 10.0.0 and reboot (to skip this step, set `install_base` to False)
- Download 10.0.2
- Upgrade to 10.0.2 and reboot
Does not account for HA pairs.
@@ -259,13 +336,17 @@ def upgrade_to_version(self, target_version, dryrun=False):
from panos.firewall import Firewall
fw = Firewall("10.0.0.5", "admin", "password")
fw = Firewall("192.168.1.1", "admin", "password")
fw.software.upgrade_to_version("10.0.2")
Args:
target_version (string): PAN-OS version (eg. "10.0.2") or "latest"
dryrun (bool, optional): Log what steps would be taken, but don't
make any changes to the live device. Defaults to False.
install_base (bool, optional): The upgrade path will include an
upgrade to each base image (eg. 10.0.0) before upgrade to the
patch version. If this is False, the base image will download
but not install.
Raises:
err.PanDeviceError: any problem during the upgrade process
@@ -279,10 +360,11 @@ def upgrade_to_version(self, target_version, dryrun=False):
starting_version = self.pandevice.version

# Get versions as StrictVersion objects
available_versions = map(PanOSVersion, self.versions.keys())
target_is_latest = target_version == "latest"
target_version = (
PanOSVersion(str(target_version)) if not target_is_latest else "latest"
)
current_version = PanOSVersion(self.pandevice.version)
latest_version = max(available_versions)
next_minor_version = self._next_minor_version(current_version)

# Check that this is an upgrade, not a downgrade
if current_version > target_version:
@@ -291,66 +373,54 @@ def upgrade_to_version(self, target_version, dryrun=False):
% (self.pandevice.id, self.pandevice.version, target_version)
)

# Determine the next version to upgrade to
if target_version == "latest":
next_version = min(latest_version, next_minor_version)
elif latest_version < target_version:
next_version = next_minor_version
elif not self._direct_upgrade_possible(current_version, target_version):
next_version = next_minor_version
else:
next_version = PanOSVersion(str(target_version))

if next_version not in available_versions and not dryrun:
self._logger.info(
"Device %s upgrading to %s, currently on %s. Checking for newer versions."
% (self.pandevice.id, target_version, self.pandevice.version)
)
self.check()
available_versions = map(PanOSVersion, self.versions.keys())
latest_version = max(available_versions)

# Check if done upgrading
if current_version == target_version:
if self._current_version_is_target(target_version):
self._logger.info(
"Device %s is running target version: %s"
% (self.pandevice.id, target_version)
)
return True
elif target_version == "latest" and current_version == latest_version:
self._logger.info(
"Device %s is running latest version: %s"
% (self.pandevice.id, latest_version)
% (self.pandevice.id, current_version)
)
if dryrun:
self._logger.info(
"NOTE: dryrun with 'latest' does not show all upgrades,"
)
self._logger.info(
"as new versions are learned through the upgrade process,"
"NOTE: dryrun with 'latest' does not show all upgrades, as new versions are learned through the upgrade process, so results may be different than dryrun output when using 'latest'."
)
return [str(current_version)]

# Determine the next version to upgrade to
next_version = self._next_upgrade_version(target_version, install_base)

# Download base image if needed
if (
not install_base
and not self.versions[str(next_version.baseimage)]["downloaded"]
):
if dryrun:
self._logger.info(
"so results may be different than dryrun output when using 'latest'."
"Device %s will download base image: %s"
% (self.pandevice.id, next_version.baseimage)
)
return True
else:
self.download(next_version.baseimage, sync=True)

# Ensure the content pack is upgraded to the latest
self.pandevice.content.download_and_install_latest(sync=True)
if not dryrun:
self.pandevice.content.download_and_install_latest(sync=True)

# Upgrade to the next version
self._logger.info(
"Device %s will be upgraded to version: %s"
"Device %s will download and upgrade to version: %s"
% (self.pandevice.id, next_version)
)
if dryrun:
self.pandevice.version = str(next_version)
else:
self.download_install_reboot(next_version, sync=True)
self.check()
result = self.upgrade_to_version(target_version, dryrun=dryrun)
result = self.upgrade_to_version(
target_version, dryrun=dryrun, install_base=install_base
)
if result and dryrun:
self.pandevice.version = starting_version
return result
return [str(current_version)] + result

def _next_major_version(self, version):
if isstring(version):
@@ -361,12 +431,15 @@ def _next_major_version(self, version):
next_version = PanOSVersion("7.0.1")
return next_version

def _next_minor_version(self, version):
def _next_minor_version(self, version: Union[PanOSVersion, str]) -> PanOSVersion:
from panos.firewall import Firewall

if isstring(version):
next_version = PanOSVersion(version)
if version.minor == 1:
version = PanOSVersion(str(version))

# Account for 10.2.x (only release with minor version of '2')
if version.major == 10 and version.minor == 1:
next_version = PanOSVersion("10.2.0")
elif version.minor > 0:
next_version = PanOSVersion(str(version.major + 1) + ".0.0")
# There is no PAN-OS 5.1 for firewalls, so next minor release from 5.0.x is 6.0.0.
elif (
@@ -390,7 +463,22 @@ def _next_patch_version(self, version):
)
return next_version

def _direct_upgrade_possible(self, current_version, target_version):
def _latest_patch_version(
self, version: Union[str, PanOSVersion], available_versions: List[PanOSVersion]
):
if isstring(version):
version = PanOSVersion(version)
found_patch = False
latest_patch: PanOSVersion = PanOSVersion("0.0.0")
for v in available_versions:
if v.major == version.major and v.minor == version.minor:
latest_patch = max(latest_patch, v)
found_patch = True
return latest_patch if found_patch else None

def _direct_upgrade_possible(
self, current_version, target_version, install_base=True
):
"""Check if current version can directly upgrade to target version
:returns True if a direct upgrade is possible, False if not
@@ -415,7 +503,7 @@ def _direct_upgrade_possible(self, current_version, target_version):
current_version.major == target_version.major
and current_version.minor == 0
and target_version.minor == 1
and target_version.patch == 0
and (not install_base or target_version.patch == 0)
):
return True

@@ -425,10 +513,12 @@ def _direct_upgrade_possible(self, current_version, target_version):
current_version.major + 1 == target_version.major
and current_version.minor == 1
and target_version.minor == 0
and target_version.patch == 0
and (not install_base or target_version.patch == 0)
):
return True

# SPECIAL CASES

# Upgrading a firewall from PAN-OS 5.0.x to 6.0.x
# This is a special case because there is no PAN-OS 5.1.x
from panos.firewall import Firewall
@@ -441,6 +531,17 @@ def _direct_upgrade_possible(self, current_version, target_version):
):
return True

# Upgrade from PAN-OS 10.1.x to 10.2.x
# This is a special case because only minor release with a 2
if (
current_version.major == 10
and current_version.minor == 1
and target_version.major == 10
and target_version.minor == 2
and (not install_base or target_version.patch == 0)
):
return True

return False


112 changes: 112 additions & 0 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
try:
from unittest import mock
except ImportError:
import mock

from panos import PanOSVersion
from panos.firewall import Firewall


def _fw(version):
fw = Firewall("127.0.0.1", "admin", "admin", "secret")
fw._set_version_and_version_info(version)
return fw


def _updater_fw_setup(*args):
fw = _fw()

return fw


def versionStrToTuple(version_string):
tokens = version_string.split(".")[:3]
tokens[2] = tokens[2].split("-")[0]
return tuple(int(x) for x in tokens)


def _create_mock_check(fw):
patches = range(5)

def mock_check():
version_info = versionStrToTuple(fw.version)
current_minor = ".".join(map(lambda x: str(x), version_info[0:-1]))
if version_info[1] == 0:
next_minor = ".".join([str(version_info[0]), "1"])
else:
next_minor = ".".join([str(version_info[0] + 1), "0"])
versions = [".".join((str(current_minor), str(patch))) for patch in patches]
versions += [".".join((str(next_minor), str(patch))) for patch in patches]
fw.software.versions = {version: {"downloaded": False} for version in versions}

return mock_check


def _create_mock_download_install_reboot(fw):
def mock_download_install_reboot(next_version, sync):
fw.version = str(next_version)
return next_version

return mock_download_install_reboot


def test_upgrade_to_version_with_install_base():
fw = _fw("8.0.2")

fw.software.check = mock.Mock(side_effect=_create_mock_check(fw))
fw.software.download_install_reboot = mock.Mock(
side_effect=_create_mock_download_install_reboot(fw)
)
fw.content.download_and_install_latest = mock.Mock()
fw.software.download = mock.Mock()

result = fw.software.upgrade_to_version("10.1.3")
assert result == ["8.0.2", "8.1.0", "9.0.0", "9.1.0", "10.0.0", "10.1.0", "10.1.3"]


def test_upgrade_to_version_without_install_base():
fw = _fw("8.0.2")

fw.software.check = mock.Mock(side_effect=_create_mock_check(fw))
fw.software.download_install_reboot = mock.Mock(
side_effect=_create_mock_download_install_reboot(fw)
)
fw.content.download_and_install_latest = mock.Mock()
fw.software.download = mock.Mock()

result = fw.software.upgrade_to_version("10.1.3", install_base=False)
assert result == ["8.0.2", "8.1.4", "9.0.4", "9.1.4", "10.0.4", "10.1.3"]


def test_next_upgrade_version_with_10_2_with_install_base():
fw = _fw("10.1.3")
fw.software.versions = {
"10.1.0": "",
"10.1.1": "",
"10.1.2": "",
"10.1.3": "",
"10.1.4": "",
"10.2.0": "",
"10.2.1": "",
"10.2.2": "",
"10.2.3": "",
}
result = fw.software._next_upgrade_version("11.0.2", install_base=True)
assert result == PanOSVersion("10.2.0")


def test_next_upgrade_version_with_10_2_without_install_base():
fw = _fw("10.1.3")
fw.software.versions = {
"10.1.0": "",
"10.1.1": "",
"10.1.2": "",
"10.1.3": "",
"10.1.4": "",
"10.2.0": "",
"10.2.1": "",
"10.2.2": "",
"10.2.3": "",
}
result = fw.software._next_upgrade_version("11.0.2", install_base=False)
assert result == PanOSVersion("10.2.3")