Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Support for Debian Packages. #89

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extractcode-libarchive==3.5.1.210531
fasteners==0.17.3
fingerprints==1.0.3
ftfy==6.0.3
ftputil==5.0.4
future==0.18.2
gemfileparser==0.8.0
html5lib==1.1
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ install_requires =
attrs
packageurl-python
requests
debian-inspector
ftputil
extractcode[full]


[options.packages.find]
Expand Down
16 changes: 13 additions & 3 deletions src/fetchcode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


class Response:
def __init__(self, location, content_type, size, url):
def __init__(self, location, content_type, size, url, success=True):
"""
Represent the response from fetching a URL with:
- `location`: the absolute location of the files that was fetched
Expand All @@ -36,22 +36,32 @@ def __init__(self, location, content_type, size, url):
self.size = size
self.content_type = content_type
self.location = location
self.success = success


def fetch_http(url, location):
"""
Return a `Response` object built from fetching the content at a HTTP/HTTPS based `url` URL string
saving the content in a file at `location`
"""
r = requests.get(url)
try:
r = requests.get(url)
except ConnectionError:
raise Exception(f"Failed to fetch: {url}")

if r.status_code != 200:
success = False
else:
success = True

with open(location, 'wb') as f:
f.write(r.content)

content_type = r.headers.get('content-type')
size = r.headers.get('content-length')
size = int(size) if size else None

resp = Response(location=location, content_type=content_type, size=size, url=url)
resp = Response(location=location, content_type=content_type, size=size, url=url, success=success)

return resp

Expand Down
195 changes: 195 additions & 0 deletions src/fetchcode/ls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


from datetime import datetime
from functools import total_ordering
import logging
import posixpath
import stat

from ftputil.stat import UnixParser
from ftputil.error import ParserError


TRACE = False

logger = logging.getLogger(__name__)
if TRACE:
import sys
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logger.setLevel(logging.DEBUG)

"""
Parse directory listings such as a find or ls command output.
These are commonly provided as file indexes in package repositories.
"""

# TODO: use constants for entry types
DIR = 'd'
FILE = 'f'
LINK = 'l'
SPECIAL = 's'

# FIXME: do we really need link and special file support?


@total_ordering
class Entry(object):
"""
Represent a file, directory or link entry in a directory listing.
"""
__slots__ = 'path', 'type', 'size', 'date', 'target'

def __init__(self, path=None, type=None, size=None, date=None, target=None): # NOQA
self.path = path
self.type = type
self.size = size
self.date = date
self.target = target
if TRACE:
logger.debug('Entry(): ' + repr(self))

def __repr__(self):
base = 'Entry(path=%(path)r, type=%(type)r, size=%(size)r, date=%(date)r'
link_target = ')'
if self.type == LINK:
link_target = ', target=%(target)r)'
return (base + link_target) % self.to_dict()

def __eq__(self, other):
return isinstance(other, Entry) and self.to_dict() == other.to_dict()

def __lt__(self, other):
return isinstance(other, Entry) and tuple(self.to_dict().items()) < tuple(other.to_dict().items())

def __hash__(self):
return hash(tuple(self.to_dict().items()))

def to_dict(self):
return {
'path': self.path,
'type': self.type,
'size': self.size,
'date': self.date,
'target': self.target,
}

@classmethod
def from_stat(self, stat_result, base_dir='', use_utc_time=True):
"""
Return a new Entry built from a stat-like tuple and a base
directory.
"""
res_type = None
path = stat_result._st_name
path = clean_path(path)

# ignore date and size unless a file
date = None
size = 0

target = None
mode = stat_result.st_mode

if stat.S_ISREG(mode):
res_type = FILE
if use_utc_time:
utc_date = datetime.utcfromtimestamp(stat_result.st_mtime)
else:
utc_date = datetime.fromtimestamp(stat_result.st_mtime)
date = datetime.isoformat(utc_date)[:10]
size = stat_result.st_size

elif stat.S_ISDIR(mode):
res_type = DIR

elif stat.S_ISLNK(mode):
res_type = LINK
target = stat_result._st_target

else:
# anything else is some special file of sorts
res_type = SPECIAL

# rejoin path with base-dir if any
if base_dir and base_dir != '.':
base_dir = clean_path(base_dir)
path = posixpath.join(base_dir, path)

return Entry(path, res_type, size, date, target)


def clean_path(path):
"""
Return a path cleaned from leading and trailing slashes and leading ./.
"""
path = path.strip().strip('/')
if path.startswith('./'):
path = path[2:]
return path.strip()


def remove_inode(line):
"""
Return the line with leading inode number and size in block (which are
numbers separated by spaces) are removed.
"""
_, _, line = line.strip().partition(' ')
_, _, line = line.strip().partition(' ')
return line.strip()


def parse_directory_listing(dir_listing, from_find=False):
"""
Yield Entry from a `dir_listing` directory listing text.

If`from_find` is True the directory listing is assumed to come from a "find
-ls" command. Otherwise it is assumed to come from an "ls -alR" command.

For "find -ls" all lines start with an inode number, e.g. a set of digits.
Note: the "find -ls" is similar to the "ls -ils" format (except for paths):
we have an inode and size in block prefixing each listing line.
"""
lines = dir_listing.splitlines()
parser = UnixParser()

# default in case this would not be a recursive listing: we always need a base dir
base_dir = ''
for ln, line in enumerate(lines, 1):
line = line.strip()
if parser.ignores_line(line):
continue

if from_find:
line = remove_inode(line)

file_stat = None
try:
file_stat = parser.parse_line(line)
if TRACE:
logger.debug('parse_directory_listing:file_stat: ' + repr(file_stat))
dt = datetime.utcfromtimestamp(file_stat.st_mtime)
dt = datetime.isoformat(dt)
logger.debug('parse_directory_listing:file_stat:date: ' + repr(dt))

except ParserError as pe:
# this is likely a directory line from an ls -LR listing. Strip
# trailing colon and keep track of the base directory
if not line.endswith(':'):
raise Exception('Unknown directory listing line format: #%(ln)d: %(line)r' % locals())
base_dir = line.strip(':')
continue

if file_stat._st_name in ('.', '..'):
continue

entry = Entry.from_stat(file_stat, base_dir=base_dir, use_utc_time=False)
if entry:
yield entry
75 changes: 41 additions & 34 deletions src/fetchcode/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from attr import attrs, attrib

from packageurl.contrib.route import NoRouteAvailable
from packageurl import PackageURL
from packageurl.contrib.route import Router
import requests

from fetchcode.packagedcode_models import Package
from src.fetchcode.utils import *

router = Router()

Expand All @@ -38,35 +34,6 @@ def info(url):
return


def get_response(url):
"""
Generate `Package` object for a `url` string
"""
resp = requests.get(url)
if resp.status_code == 200:
return resp.json()

raise Exception(f"Failed to fetch: {url}")


def get_pypi_bugtracker_url(project_urls):
bug_tracking_url = project_urls.get("Tracker")
if not (bug_tracking_url):
bug_tracking_url = project_urls.get("Issue Tracker")
if not (bug_tracking_url):
bug_tracking_url = project_urls.get("Bug Tracker")
return bug_tracking_url


def get_pypi_codeview_url(project_urls):
code_view_url = project_urls.get("Source")
if not (code_view_url):
code_view_url = project_urls.get("Code")
if not (code_view_url):
code_view_url = project_urls.get("Source Code")
return code_view_url


@router.route("pkg:cargo/.*")
def get_cargo_data_from_purl(purl):
"""
Expand Down Expand Up @@ -327,3 +294,43 @@ def get_rubygems_data_from_purl(purl):
download_url=download_url,
**purl.to_dict(),
)


@router.route("pkg:deb/.*")
def get_debian_packages(purl):
purl = PackageURL.from_string(purl)
name = purl.name
version = purl.version

# If no arch is provided just return PackageInfo for source package if available.
arch = purl.qualifiers.get("arch", "source")
base_path = f"https://ftp.debian.org/debian/pool/main"

source = False

name_parts = name.split("_")
version_parts = version.split("_")
if len(name_parts) == 3:
arch = name_parts[2]
version = name_parts[1]
name = name_parts[0]
elif len(version_parts) == 2:
arch = version_parts[1]
version = version_parts[0]

if arch == "source":
# This can be either .gz or .xz
package_name = f"{name}_{version}.debian.tar"
source = True
else:
# The Debian binary package file names conform to the following convention:
# <foo>_<VersionNumber>-<DebianRevisionNumber>_<DebianArchitecture>.deb
package_name = f"{name}_{version}_{arch}.deb"

debian_processed_data = process_debian_data(package_name, source)

# FIXME: What to do when there are multiple licenses
yield Package(
**debian_processed_data,
**purl.to_dict()
)
Loading