from os import environ
from pathlib import Path
import time
from typing import List, Dict, Any, Union, Tuple, Optional, TYPE_CHECKING
from pygit2 import DiffHunk # type: ignore
from ..loggers import logger
if TYPE_CHECKING: # pragma: no covers
# circular import
from ..clang_tools.clang_tidy import TidyAdvice
from ..clang_tools.clang_format import FormatAdvice
#: A path to generated cache artifacts. (only used when verbosity is in debug mode)
CACHE_PATH = Path(environ.get("CPP_LINTER_CACHE", ".cpp-linter_cache"))
[docs]
class FileObj:
"""A class to represent a single file being analyzed.
:param name: The file name. This should use Unix style path delimiters (``/``),
even on Windows.
:param additions: A `list` of line numbers that have added changes in the diff.
This value is used to populate the `lines_added` property.
:param diff_chunks: The ranges that define the beginning and ending line numbers
for all hunks in the diff.
"""
def __init__(
self,
name: str,
additions: Optional[List[int]] = None,
diff_chunks: Optional[List[List[int]]] = None,
):
self.name: str = name #: The file name
self.additions: List[int] = additions or []
"""A list of line numbers that contain added changes. This will be empty if
not focusing on lines changed only."""
self.diff_chunks: List[List[int]] = diff_chunks or []
"""A list of line numbers that define the beginning and ending of hunks in the
diff. This will be empty if not focusing on lines changed only."""
self.lines_added: List[List[int]] = FileObj._consolidate_list_to_ranges(
additions or []
)
"""A list of line numbers that define the beginning and ending of ranges that
have added changes. This will be empty if not focusing on lines changed only.
"""
#: The results from clang-tidy
self.tidy_advice: Optional["TidyAdvice"] = None
#: The results from clang-format
self.format_advice: Optional["FormatAdvice"] = None
def __repr__(self) -> str:
return f"<FileObj {self.name} added:{self.additions} chunks:{self.diff_chunks}>"
@staticmethod
def _consolidate_list_to_ranges(numbers: List[int]) -> List[List[int]]:
"""A helper function that is only used after parsing the lines from a diff that
contain additions.
:param numbers: A `list` of integers representing the lines' numbers that
contain additions.
:returns: A consolidated sequence of lists. Each list will have 2 items
describing the starting and ending lines of all line ``numbers``.
"""
result: List[List[int]] = []
for i, n in enumerate(numbers):
if not i:
result.append([n])
elif n - 1 != numbers[i - 1]:
result[-1].append(numbers[i - 1] + 1)
result.append([n])
if i == len(numbers) - 1:
result[-1].append(n + 1)
return result
[docs]
def range_of_changed_lines(
self, lines_changed_only: int, get_ranges: bool = False
) -> Union[List[int], List[List[int]]]:
"""Assemble a list of lines changed.
:param lines_changed_only: A flag to indicate the focus of certain lines.
- ``0``: focuses on all lines in a file(s).
- ``1``: focuses on any lines shown in the event's diff (may include
unchanged lines).
- ``2``: focuses strictly on lines in the diff that contain additions.
:param get_ranges: A flag to return a list of sequences representing
:py:class:`range` parameters. Defaults to `False` since this is only
required when constructing clang-tidy or clang-format CLI arguments.
:returns:
A list of line numbers for which to give attention. If ``get_ranges`` is
asserted, then the returned list will be a list of ranges. If
``lines_changed_only`` is ``0``, then an empty list is returned.
"""
if lines_changed_only:
ranges = self.diff_chunks if lines_changed_only == 1 else self.lines_added
if get_ranges:
return ranges
return self.additions
# we return an empty list (instead of None) here so we can still iterate it
return [] # type: ignore[return-value]
[docs]
def serialize(self) -> Dict[str, Any]:
"""For easy debugging, use this method to serialize the `FileObj` into a json
compatible `dict`."""
return {
"filename": self.name,
"line_filter": {
"diff_chunks": self.diff_chunks,
"lines_added": self.lines_added,
},
}
[docs]
def is_hunk_contained(self, hunk: DiffHunk) -> Optional[Tuple[int, int]]:
"""Does a given ``hunk`` start and end within a single diff hunk?
This also includes some compensations for hunk headers that are oddly formed.
.. tip:: This is mostly useful to create comments that can be posted within a
git changes' diff. Ideally, designed for PR reviews based on patches
generated by clang tools' output.
:returns: The appropriate starting and ending line numbers of the given hunk.
If hunk cannot fit in a single hunk, this returns `None`.
"""
if hunk.old_lines > 0:
start = hunk.old_start
# span of old_lines is an inclusive range
end = hunk.old_start + hunk.old_lines - 1
else: # if number of old lines is 0
# start hunk at new line number
start = hunk.new_start
# make it span 1 line
end = start
return self.is_range_contained(start, end)
[docs]
def is_range_contained(self, start: int, end: int) -> Optional[Tuple[int, int]]:
"""Does the given ``start`` and ``end`` line numbers fit within a single diff
hunk?
This is a helper function to `is_hunk_contained()`.
.. tip:: This is mostly useful to create comments that can be posted within a
git changes' diff. Ideally, designed for PR reviews based on patches
generated by clang tools' output.
:returns: The appropriate starting and ending line numbers of the given hunk.
If hunk cannot fit in a single hunk, this returns `None`.
"""
for hunk in self.diff_chunks:
chunk_range = range(hunk[0], hunk[1])
if start in chunk_range and end in chunk_range:
return (start, end)
logger.warning(
"lines %d - %d are not within a single diff hunk for file %s.",
start,
end,
self.name,
)
return None
[docs]
def read_with_timeout(self, timeout_ns: int = 1_000_000_000) -> bytes:
"""Read the entire file's contents.
:param timeout_ns: The number of nanoseconds to wait till timeout occurs.
Defaults to 1 second.
:returns: The bytes read from the file.
:raises FileIOTimeout: When the operation did not succeed due to a timeout.
:raises OSError: When the file could not be opened due to an `OSError`.
"""
contents = b""
success = False
exception: Union[OSError, FileIOTimeout] = FileIOTimeout(
f"Failed to read from file '{self.name}' within "
+ f"{round(timeout_ns / 1_000_000_000, 2)} seconds"
)
timeout = time.monotonic_ns() + timeout_ns
while not success and time.monotonic_ns() < timeout:
try:
with open(self.name, "rb") as f:
while not success and time.monotonic_ns() < timeout:
if f.readable():
contents = f.read()
success = True
else: # pragma: no cover
time.sleep(0.001) # Sleep to prevent busy-waiting
except OSError as exc: # pragma: no cover
exception = exc
if not success and exception: # pragma: no cover
raise exception
return contents
[docs]
def read_write_with_timeout(
self,
data: Union[bytes, bytearray],
timeout_ns: int = 1_000_000_000,
) -> bytes:
"""Read then write the entire file's contents.
:param data: The bytes to write to the file. This will overwrite the contents
being read beforehand.
:param timeout_ns: The number of nanoseconds to wait till timeout occurs.
Defaults to 1 second.
:returns: The bytes read from the file.
:raises FileIOTimeout: When the operation did not succeed due to a timeout.
:raises OSError: When the file could not be opened due to an `OSError`.
"""
success = False
exception: Union[OSError, FileIOTimeout] = FileIOTimeout(
f"Failed to read then write file '{self.name}' within "
+ f"{round(timeout_ns / 1_000_000_000, 2)} seconds"
)
original_data = b""
timeout = time.monotonic_ns() + timeout_ns
while not success and time.monotonic_ns() < timeout:
try:
with open(self.name, "r+b") as f:
while not success and time.monotonic_ns() < timeout:
if f.readable():
original_data = f.read()
f.seek(0)
else: # pragma: no cover
time.sleep(0.001) # Sleep to prevent busy-waiting
continue
while not success and time.monotonic_ns() < timeout:
if f.writable():
f.write(data)
f.truncate()
success = True
else: # pragma: no cover
time.sleep(0.001) # Sleep to prevent busy-waiting
except OSError as exc: # pragma: no cover
exception = exc
if not success and exception: # pragma: no cover
raise exception
return original_data
[docs]
class FileIOTimeout(Exception):
"""An exception thrown when a file operation timed out."""
[docs]
def has_line_changes(
lines_changed_only: int, diff_chunks: List[List[int]], additions: List[int]
) -> bool:
"""Does this file actually apply to condition specified by ``lines_changed_only``?
:param lines_changed_only: A value that means:
- 0 = We don't care. Analyze the whole file.
- 1 = Only analyze lines in the diff chunks, which may include unchanged
lines but not lines with subtractions.
- 2 = Only analyze lines with additions.
:param diff_chunks: The ranges of lines in the diff for a single file.
:param additions: The lines with additions in the diff for a single file.
"""
return (
(lines_changed_only == 1 and len(diff_chunks) > 0)
or (lines_changed_only == 2 and len(additions) > 0)
or not lines_changed_only
)
[docs]
def get_line_cnt_from_cols(data: bytes, offset: int) -> Tuple[int, int]:
"""Gets a line count and columns offset from a file's absolute offset.
:param data: Bytes content to analyze.
:param offset: The byte offset to translate
:returns:
A `tuple` of 2 `int` numbers:
- Index 0 is the line number for the given offset.
- Index 1 is the column number for the given offset on the line.
"""
# logger.debug("Getting line count from %s at offset %d", file_path, offset)
contents = data[:offset]
return (contents.count(b"\n") + 1, offset - contents.rfind(b"\n"))