Improved code quality

This commit is contained in:
Dimitris Zlatanidis 2024-04-27 13:41:42 +03:00
parent f3c8065fa8
commit f23fb21b90

View file

@ -14,9 +14,13 @@ from slpkg.views.asciibox import AsciiBox
from slpkg.progress_bar import ProgressBar from slpkg.progress_bar import ProgressBar
class MultiProcess(Configs): class MultiProcess(Configs): # pylint: disable=[R0902]
def __init__(self, flags=None): """
Creates parallel process between progress bar and process.
"""
def __init__(self, flags: list = None):
super(Configs, self).__init__() super(Configs, self).__init__()
self.utils = Utilities() self.utils = Utilities()
@ -83,7 +87,7 @@ class MultiProcess(Configs):
else: else:
self._run(command) self._run(command)
def _run(self, command: str, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) -> None: def _run(self, command: str, stdout: subprocess = subprocess.PIPE, stderr: subprocess = subprocess.STDOUT) -> None:
""" """
Build the package and write a log file. Build the package and write a log file.
Args: Args:
@ -93,30 +97,31 @@ class MultiProcess(Configs):
Returns: Returns:
None. None.
""" """
process = subprocess.Popen(command, shell=True, stdout=stdout, stderr=stderr, text=True) with subprocess.Popen(command, shell=True, stdout=stdout, stderr=stderr, text=True) as process:
self._write_log_head() self._write_log_head()
# Write the process to the log file and to the terminal. # Write the process to the log file and to the terminal.
with process.stdout as output: # type: ignore[union-attr] with process.stdout as output: # type: ignore[union-attr]
for i, line in enumerate(output): for line in output:
if not self.progress_bar_conf and not self.option_for_progress_bar: if not self.progress_bar_conf and not self.option_for_progress_bar:
print(line.strip()) # Print to console print(line.strip()) # Print to console
if self.process_log: if self.process_log:
with open(self.slpkg_log_file, 'a') as log: with open(self.slpkg_log_file, 'a', encoding='utf-8') as log:
log.write(line) # Write to log file log.write(line) # Write to log file
self._write_log_eof() self._write_log_eof()
process.wait() # Wait for the process to finish process.wait() # Wait for the process to finish
# If the process failed, return exit code. # If the process failed, return exit code.
if process.returncode != 0: if process.returncode != 0:
self._error_process() self._error_process()
raise SystemExit(process.returncode) raise SystemExit(process.returncode)
def _error_process(self) -> None: def _error_process(self) -> None:
""" Prints error message for a process. """ """ Prints error message for a process.
"""
if not self.progress_bar_conf and not self.option_for_progress_bar: if not self.progress_bar_conf and not self.option_for_progress_bar:
message: str = 'Error occurred with process. Please check the log file.' message: str = 'Error occurred with process. Please check the log file.'
print() print()
@ -126,23 +131,25 @@ class MultiProcess(Configs):
print() print()
def _write_log_head(self) -> None: def _write_log_head(self) -> None:
""" Write the timestamp at the head of the log file. """ """ Write the timestamp at the head of the log file.
"""
if self.process_log: if self.process_log:
with open(self.slpkg_log_file, 'a') as log: with open(self.slpkg_log_file, 'a', encoding='utf-8') as log:
log.write(f"{len(self.head_message) * '='}\n") log.write(f"{len(self.head_message) * '='}\n")
log.write(f'{self.head_message}\n') log.write(f'{self.head_message}\n')
log.write(f"{len(self.head_message) * '='}\n") log.write(f"{len(self.head_message) * '='}\n")
def _write_log_eof(self) -> None: def _write_log_eof(self) -> None:
""" Write the bottom of the log file. """ """ Write the bottom of the log file.
"""
if self.process_log: if self.process_log:
with open(self.slpkg_log_file, 'a') as log: with open(self.slpkg_log_file, 'a', encoding='utf-8') as log:
log.write(f"\n{len(self.bottom_message) * '='}\n") log.write(f"\n{len(self.bottom_message) * '='}\n")
log.write(f'{self.bottom_message}\n') log.write(f'{self.bottom_message}\n')
log.write(f"{len(self.bottom_message) * '='}\n\n") log.write(f"{len(self.bottom_message) * '='}\n\n")
@staticmethod @staticmethod
def process(command: str, stderr=None, stdout=None) -> None: def process(command: str, stderr: subprocess = None, stdout: subprocess = None) -> None:
""" """
Build the package and write a log file. Build the package and write a log file.
Args: Args:
@ -153,9 +160,9 @@ class MultiProcess(Configs):
None. None.
""" """
try: try:
output = subprocess.run(f'{command}', shell=True, stderr=stderr, stdout=stdout) output = subprocess.run(f'{command}', shell=True, stderr=stderr, stdout=stdout, check=True)
except KeyboardInterrupt: except KeyboardInterrupt as e:
raise SystemExit(1) raise SystemExit(1) from e
if output.returncode != 0: if output.returncode != 0:
if not command.startswith(('wget', 'wget2', 'curl', 'lftp')): if not command.startswith(('wget', 'wget2', 'curl', 'lftp')):