Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Do not scan files tagged with av-status=DO_NOT_SCAN #175

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ the table below for reference.
| AV_SCAN_START_SNS_ARN | SNS topic ARN to publish notification about start of scan | | No |
| AV_SCAN_START_METADATA | The tag/metadata indicating the start of the scan | av-scan-start | No |
| AV_SIGNATURE_METADATA | The tag/metadata name representing file's AV type | av-signature | No |
| AV_STATUS_DO_NOT_SCAN | The value assigned to block scanning of items inside of tags/metadata | DO_NOT_SCAN | No |
| AV_STATUS_CLEAN | The value assigned to clean items inside of tags/metadata | CLEAN | No |
| AV_STATUS_INFECTED | The value assigned to clean items inside of tags/metadata | INFECTED | No |
| AV_STATUS_METADATA | The tag/metadata name representing file's AV status | av-status | No |
Expand Down
1 change: 1 addition & 0 deletions common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
AV_SIGNATURE_METADATA = os.getenv("AV_SIGNATURE_METADATA", "av-signature")
AV_SIGNATURE_OK = "OK"
AV_SIGNATURE_UNKNOWN = "UNKNOWN"
AV_STATUS_DO_NOT_SCAN = os.getenv("AV_STATUS_DO_NOT_SCAN", "DO_NOT_SCAN")
AV_STATUS_CLEAN = os.getenv("AV_STATUS_CLEAN", "CLEAN")
AV_STATUS_INFECTED = os.getenv("AV_STATUS_INFECTED", "INFECTED")
AV_STATUS_METADATA = os.getenv("AV_STATUS_METADATA", "av-status")
Expand Down
21 changes: 21 additions & 0 deletions scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from common import AV_SCAN_START_SNS_ARN
from common import AV_SIGNATURE_METADATA
from common import AV_STATUS_CLEAN
from common import AV_STATUS_DO_NOT_SCAN
from common import AV_STATUS_INFECTED
from common import AV_STATUS_METADATA
from common import AV_STATUS_SNS_ARN
Expand Down Expand Up @@ -213,6 +214,14 @@ def lambda_handler(event, context):
print("Script starting at %s\n" % (start_time))
s3_object = event_object(event, event_source=EVENT_SOURCE)

if not object_have_to_be_scanned(s3_client, s3_object):
set_av_tags(s3_client, s3_object, AV_STATUS_DO_NOT_SCAN, AV_SIGNATURE_UNKNOWN, get_timestamp())
print(
"Skipp of s3://%s file is tagged DO_NOT_SCAN \n"
% (os.path.join(s3_object.bucket_name, s3_object.key))
)
return

if str_to_bool(AV_PROCESS_ORIGINAL_VERSION_ONLY):
verify_s3_object_version(s3, s3_object)

Expand Down Expand Up @@ -274,3 +283,15 @@ def lambda_handler(event, context):

def str_to_bool(s):
return bool(strtobool(str(s)))


# Determine if an object have to be scanned (tagged DO_NOT_CLEAN)
def object_have_to_be_scanned(s3_client, s3_object):
s3_object_tags = s3_client.get_object_tagging(Bucket=s3_object.bucket_name, Key=s3_object.key)
if "TagSet" not in s3_object_tags:
return True
for tag in s3_object_tags["TagSet"]:
if tag["Key"] in [AV_STATUS_METADATA] and tag["Value"] in [AV_STATUS_DO_NOT_SCAN]:
return False
return True

15 changes: 13 additions & 2 deletions scan_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from common import AV_TIMESTAMP_METADATA
from common import S3_ENDPOINT


# Get all objects in an S3 bucket that have not been previously scanned
def get_objects(s3_client, s3_bucket_name):

Expand All @@ -43,12 +42,24 @@ def get_objects(s3_client, s3_bucket_name):
for key in s3_list_objects_result["Contents"]:
key_name = key["Key"]
# Don't include objects that have been scanned
if not object_previously_scanned(s3_client, s3_bucket_name, key_name):
if not object_previously_scanned(s3_client, s3_bucket_name, key_name) and \
object_have_to_be_scanned(s3_client, s3_bucket_name, key_name):
s3_object_list.append(key_name)

return s3_object_list


# Determine if an object have to be scanned (tagged DO_NOT_CLEAN)
def object_have_to_be_scanned(s3_client, s3_bucket_name, key_name):
s3_object_tags = s3_client.get_object_tagging(Bucket=s3_bucket_name, Key=key_name)
if "TagSet" not in s3_object_tags:
return True
for tag in s3_object_tags["TagSet"]:
if tag["Key"] in [AV_STATUS_METADATA] and tag["Value"] in [AV_STATUS_DO_NOT_SCAN]:
return False
return True


# Determine if an object has been previously scanned for viruses
def object_previously_scanned(s3_client, s3_bucket_name, key_name):
s3_object_tags = s3_client.get_object_tagging(Bucket=s3_bucket_name, Key=key_name)
Expand Down