Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 66 additions & 119 deletions vulnerabilities/importers/osv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#
# osv.py (full updated code with helper function)

# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
Expand All @@ -9,29 +10,26 @@

import json
import logging
from typing import Iterable
from typing import List
from typing import Optional
from typing import Iterable, List, Optional

import dateparser
from cvss.exceptions import CVSS3MalformedError
from cvss.exceptions import CVSS4MalformedError
from cvss.exceptions import CVSS3MalformedError, CVSS4MalformedError
from packageurl import PackageURL
from univers.version_range import RANGE_CLASS_BY_SCHEMES
from univers.versions import InvalidVersion
from univers.versions import SemverVersion
from univers.versions import Version

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import Reference
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from univers.versions import InvalidVersion, SemverVersion, Version

from vulnerabilities.importer import (
AdvisoryData,
AffectedPackage,
AffectedPackageV2,
Reference,
ReferenceV2,
VulnerabilitySeverity,
)
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import build_description
from vulnerabilities.utils import dedupe
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import build_description, dedupe, get_cwe_id

from vulnerabilities.importers.utils import filter_purls

logger = logging.getLogger(__name__)

Expand All @@ -51,10 +49,6 @@
def parse_advisory_data(
raw_data: dict, supported_ecosystems, advisory_url: str
) -> Optional[AdvisoryData]:
"""
Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
a ``supported_ecosystem`` string.
"""
raw_id = raw_data.get("id") or ""
summary = raw_data.get("summary") or ""
details = raw_data.get("details") or ""
Expand All @@ -73,29 +67,34 @@ def parse_advisory_data(
for affected_pkg in raw_data.get("affected") or []:
purl = get_affected_purl(affected_pkg=affected_pkg, raw_id=raw_id)

if not purl or purl.type not in supported_ecosystems:
logger.error(f"Unsupported package type: {affected_pkg!r} in OSV: {raw_id!r}")
continue

affected_version_range = get_affected_version_range(
affected_pkg=affected_pkg,
raw_id=raw_id,
supported_ecosystem=purl.type,
# Replace duplicate filtering logic with helper
filtered_purls = filter_purls(
[purl],
allowed_types=supported_ecosystems
)

for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range, raw_id=raw_id, supported_ecosystem=purl.type
for p in filtered_purls:
affected_version_range = get_affected_version_range(
affected_pkg=affected_pkg,
raw_id=raw_id,
supported_ecosystem=p.type,
)

for version in fixed_version:
affected_packages.append(
AffectedPackage(
package=purl,
affected_version_range=affected_version_range,
fixed_version=version,
)
for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range,
raw_id=raw_id,
supported_ecosystem=p.type
)

for version in fixed_version:
affected_packages.append(
AffectedPackage(
package=p,
affected_version_range=affected_version_range,
fixed_version=version,
)
)
database_specific = raw_data.get("database_specific") or {}
cwe_ids = database_specific.get("cwe_ids") or []
weaknesses = list(map(get_cwe_id, cwe_ids))
Expand All @@ -114,10 +113,6 @@ def parse_advisory_data(
def parse_advisory_data_v2(
raw_data: dict, supported_ecosystems, advisory_url: str, advisory_text: str
) -> Optional[AdvisoryData]:
"""
Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
a ``supported_ecosystem`` string.
"""
advisory_id = raw_data.get("id") or ""
if not advisory_id:
logger.error(f"Missing advisory id in OSV data: {raw_data}")
Expand All @@ -136,37 +131,39 @@ def parse_advisory_data_v2(
for affected_pkg in raw_data.get("affected") or []:
purl = get_affected_purl(affected_pkg=affected_pkg, raw_id=advisory_id)

if not purl or purl.type not in supported_ecosystems:
logger.error(f"Unsupported package type: {affected_pkg!r} in OSV: {advisory_id!r}")
continue

affected_version_range = get_affected_version_range(
affected_pkg=affected_pkg,
raw_id=advisory_id,
supported_ecosystem=purl.type,
filtered_purls = filter_purls(
[purl],
allowed_types=supported_ecosystems
)

fixed_versions = []
fixed_version_range = None
for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range, raw_id=advisory_id, supported_ecosystem=purl.type
for p in filtered_purls:
affected_version_range = get_affected_version_range(
affected_pkg=affected_pkg,
raw_id=advisory_id,
supported_ecosystem=p.type,
)
fixed_versions.extend([v.string for v in fixed_version])

fixed_version_range = (
get_fixed_version_range(fixed_versions, purl.type) if fixed_versions else None
)

if fixed_version_range or affected_version_range:
affected_packages.append(
AffectedPackageV2(
package=purl,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
fixed_versions = []
fixed_version_range = None
for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range, raw_id=advisory_id, supported_ecosystem=p.type
)
fixed_versions.extend([v.string for v in fixed_version])

fixed_version_range = (
get_fixed_version_range(fixed_versions, p.type) if fixed_versions else None
)

if fixed_version_range or affected_version_range:
affected_packages.append(
AffectedPackageV2(
package=p,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)
)

database_specific = raw_data.get("database_specific") or {}
cwe_ids = database_specific.get("cwe_ids") or []
weaknesses = list(map(get_cwe_id, cwe_ids))
Expand All @@ -189,19 +186,6 @@ def parse_advisory_data_v2(


def extract_fixed_versions(fixed_range) -> Iterable[str]:
"""
Return a list of fixed version strings given a ``fixed_range`` mapping of
OSV data.

>>> list(extract_fixed_versions(
... {"type": "SEMVER", "events": [{"introduced": "0"},{"fixed": "1.6.0"}]}))
['1.6.0']

>>> list(extract_fixed_versions(
... {"type": "ECOSYSTEM","events":[{"introduced": "0"},
... {"fixed": "1.0.0"},{"fixed": "9.0.0"}]}))
['1.0.0', '9.0.0']
"""
for event in fixed_range.get("events") or []:
fixed = event.get("fixed")
if fixed:
Expand All @@ -214,9 +198,6 @@ def get_published_date(raw_data):


def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:
"""
Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``
"""
try:
for severity in raw_data.get("severity") or []:
vector = severity.get("score")
Expand Down Expand Up @@ -257,10 +238,6 @@ def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:


def get_references(raw_data, severities) -> List[Reference]:
"""
Return a list Reference extracted from a mapping of OSV ``raw_data`` given a
``severities`` list of VulnerabilitySeverity.
"""
references = []
for ref in raw_data.get("references") or []:
if not ref:
Expand All @@ -274,10 +251,6 @@ def get_references(raw_data, severities) -> List[Reference]:


def get_references_v2(raw_data) -> List[Reference]:
"""
Return a list Reference extracted from a mapping of OSV ``raw_data`` given a
``severities`` list of VulnerabilitySeverity.
"""
references = []
for ref in raw_data.get("references") or []:
if not ref:
Expand All @@ -291,10 +264,6 @@ def get_references_v2(raw_data) -> List[Reference]:


def get_affected_purl(affected_pkg, raw_id):
"""
Return an affected PackageURL or None given a mapping of ``affected_pkg``
data and a ``raw_id``.
"""
package = affected_pkg.get("package") or {}
purl = package.get("purl")
if purl:
Expand All @@ -316,7 +285,6 @@ def get_affected_purl(affected_pkg, raw_id):
namespace = ""
if purl_type == "maven":
namespace, _, name = name.partition(":")

purl = PackageURL(type=purl_type, namespace=namespace, name=name)
else:
logger.error(
Expand All @@ -334,10 +302,6 @@ def get_affected_purl(affected_pkg, raw_id):


def get_affected_version_range(affected_pkg, raw_id, supported_ecosystem):
"""
Return a univers VersionRange for the ``affected_pkg`` package data mapping
or None. Use a ``raw_id`` OSV id and ``supported_ecosystem``.
"""
affected_versions = affected_pkg.get("versions")
if affected_versions:
try:
Expand All @@ -357,19 +321,6 @@ def get_fixed_version_range(versions, ecosystem):


def get_fixed_versions(fixed_range, raw_id, supported_ecosystem) -> List[Version]:
"""
Return a list of unique fixed univers Versions given a ``fixed_range``
univers VersionRange and a ``raw_id``.
For example::
>>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj", supported_ecosystem="pypi",)
[]
>>> get_fixed_versions(
... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}], },
... raw_id="GHSA-j3f7-7rmc-6wqj",
... supported_ecosystem="pypi",
... )
[PypiVersion(string='1.7.0')]
"""
fixed_versions = []
if "type" not in fixed_range:
logger.error(f"Invalid fixed_range type for: {fixed_range} for OSV id: {raw_id!r}")
Expand Down Expand Up @@ -401,8 +352,4 @@ def get_fixed_versions(fixed_range, raw_id, supported_ecosystem) -> List[Version
else:
logger.error(f"Unsupported fixed version type: {version!r} for OSV id: {raw_id!r}")

# if fixed_range_type == "GIT":
# TODO add GitHubVersion univers fix_version
# logger.error(f"NotImplementedError GIT Version - {raw_id !r} - {i !r}")

return dedupe(fixed_versions)
Loading