slpkg/slpkg/multi_process.py
Dimitris Zlatanidis 1d25f0e9fa Updated for colors
2024-03-21 18:46:42 +02:00

146 lines
5.5 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import subprocess
from datetime import datetime
from multiprocessing import Process
from slpkg.configs import Configs
from slpkg.utilities import Utilities
from slpkg.error_messages import Errors
from slpkg.views.asciibox import AsciiBox
from slpkg.progress_bar import ProgressBar
class MultiProcess(Configs):
def __init__(self, flags=None):
super(Configs, self).__init__()
self.utils = Utilities()
self.progress = ProgressBar()
self.ascii = AsciiBox()
self.errors = Errors()
self.timestamp: str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.head_message: str = f'Timestamp: {self.timestamp}'
self.bottom_message: str = 'EOF - End of log file'
if flags is not None:
self.option_for_silent: bool = self.utils.is_option(
('-n', '--silent'), flags)
def run(self, command: str, filename: str, progress_message: str) -> None:
""" Starting a multiprocessing process.
Args:
command: The command of process
filename: The filename of process.
progress_message: The message of progress.
Returns:
None.
"""
if self.silent_mode or self.option_for_silent:
done: str = f'{self.bgreen}{self.ascii.done}{self.endc}'
failed: str = f'{self.bred}{self.ascii.failed}{self.endc}'
# Starting multiprocessing
process_1 = Process(target=self.process_and_log, args=(command,))
process_2 = Process(target=self.progress.progress_bar, args=(progress_message, filename))
process_1.start()
process_2.start()
# Wait until process 1 finish
process_1.join()
# Terminate process 2 if process 1 finished
if not process_1.is_alive():
process_2.terminate()
if process_1.exitcode != 0:
print(f"\r{'':>2}{self.bred}{self.ascii.bullet}{self.endc} {filename} {failed}{' ' * 17}", end='\r')
else:
print(f"\r{'':>2}{self.bgreen}{self.ascii.bullet}{self.endc} {filename} {done}{' ' * 17}", end='\r')
# Restore the terminal cursor
print('\x1b[?25h', self.endc)
else:
self.process_and_log(command)
def process_and_log(self, command: str, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) -> None:
"""
Build the package and write a log file.
Args:
command: The command of process
stdout: Captured stdout from the child process.
stderr: Captured stderr from the child process.
Returns:
None.
"""
process = subprocess.Popen(command, shell=True, stdout=stdout, stderr=stderr, text=True)
self._write_log_head()
# Write the process to the log file and to the terminal.
with process.stdout as output:
for i, line in enumerate(output):
if not self.silent_mode and not self.option_for_silent:
print(line.strip()) # Print to console
if self.process_log:
with open(self.slpkg_log_file, 'a') as log:
log.write(line) # Write to log file
self._write_log_eof()
process.wait() # Wait for the process to finish
# If the process failed, return exit code.
if process.returncode != 0:
self._error_process()
raise SystemExit(process.returncode)
def _error_process(self):
""" Prints error message for a process. """
if not self.silent_mode and not self.option_for_silent:
message: str = f'Error occurred with process. Please check the log file.'
print()
print(len(message) * '=')
print(f'{self.bred}{message}{self.endc}')
print(len(message) * '=')
print()
def _write_log_head(self) -> None:
""" Write the timestamp at the head of the log file. """
if self.process_log:
with open(self.slpkg_log_file, 'a') as log:
log.write(f"{len(self.head_message) * '='}\n")
log.write(f'{self.head_message}\n')
log.write(f"{len(self.head_message) * '='}\n")
def _write_log_eof(self) -> None:
""" Write the bottom of the log file. """
if self.process_log:
with open(self.slpkg_log_file, 'a') as log:
log.write(f"\n{len(self.bottom_message) * '='}\n")
log.write(f'{self.bottom_message}\n')
log.write(f"{len(self.bottom_message) * '='}\n\n")
def process(self, command: str, stderr=None, stdout=None) -> None:
"""
Build the package and write a log file.
Args:
command: The command of process
stdout: Captured stdout from the child process.
stderr: Captured stderr from the child process.
Returns:
None.
"""
try:
output = subprocess.run(f'{command}', shell=True, stderr=stderr, stdout=stdout)
except KeyboardInterrupt:
raise SystemExit(1)
if output.returncode != 0:
if command.startswith(('wget', 'wget2', 'curl', 'lftp')):
message: str = f"to run command '{command.split()[0]}' exit code: {output.returncode}"
self.errors.raise_error_message(message, output.returncode)
raise SystemExit(output.returncode)