mirror of
https://github.com/RPCS3/discord-bot.git
synced 2026-01-31 01:25:22 +01:00
380 lines
17 KiB
Python
380 lines
17 KiB
Python
import re
|
|
|
|
import itertools
|
|
from collections import deque
|
|
from api import sanitize_string, trim_string
|
|
from api.result import ApiResult
|
|
from bot_config import piracy_strings
|
|
from bot_utils import get_code
|
|
|
|
SERIAL_PATTERN = re.compile('Serial: (?P<id>[A-z]{4}\\d{5})')
|
|
LIBRARIES_PATTERN = re.compile('Load libraries:(?P<libraries>.*)', re.DOTALL | re.MULTILINE)
|
|
|
|
|
|
class LogAnalyzer(object):
|
|
ERROR_SUCCESS = 0
|
|
ERROR_PIRACY = 1
|
|
ERROR_STOP = 2
|
|
ERROR_OVERFLOW = -1
|
|
ERROR_FAIL = -2
|
|
ERROR_DEFLATE_64 = -3
|
|
|
|
def clear_results(self):
|
|
if self is None or self.parsed_data is None:
|
|
return self.ERROR_SUCCESS
|
|
if self.build_and_specs is None:
|
|
self.build_and_specs = self.parsed_data["build_and_specs"]
|
|
self.trigger = None
|
|
self.trigger_context = None
|
|
self.buffer = ''
|
|
self.buffer_lines.clear()
|
|
self.libraries = []
|
|
self.parsed_data = { "build_and_specs": self.build_and_specs }
|
|
return self.ERROR_SUCCESS
|
|
|
|
def piracy_check(self):
|
|
for trigger in piracy_strings:
|
|
lower_trigger = trigger.lower()
|
|
if lower_trigger in self.buffer.lower():
|
|
self.trigger = trigger
|
|
for line in self.buffer_lines:
|
|
if lower_trigger in line.lower():
|
|
self.trigger_context = line
|
|
return self.ERROR_PIRACY
|
|
return self.ERROR_SUCCESS
|
|
|
|
def done(self):
|
|
return self.ERROR_STOP
|
|
|
|
def done_and_reset(self):
|
|
self.phase_index = 0
|
|
return self.ERROR_STOP
|
|
|
|
def get_id(self):
|
|
try:
|
|
self.product_info = get_code(re.search(SERIAL_PATTERN, self.buffer).group('id'))
|
|
return self.ERROR_SUCCESS
|
|
except AttributeError:
|
|
self.product_info = ApiResult(None, dict({"status": "Unknown"}))
|
|
return self.ERROR_SUCCESS
|
|
|
|
def parse_rsx(self):
|
|
if len(self.buffer_lines) < 4:
|
|
return False
|
|
if all(line.startswith('·!') for line in itertools.islice(self.buffer_lines, 4)) and \
|
|
all('RSX: ' in line for line in itertools.islice(self.buffer_lines, 4)):
|
|
return True
|
|
return False
|
|
|
|
def get_libraries(self):
|
|
try:
|
|
self.libraries = [lib.strip().replace('.sprx', '')
|
|
for lib
|
|
in re.search(LIBRARIES_PATTERN, self.buffer).group('libraries').strip()[1:].split('-')]
|
|
except KeyError as ke:
|
|
print(ke)
|
|
pass
|
|
return self.ERROR_SUCCESS
|
|
|
|
"""
|
|
End Trigger
|
|
Regex
|
|
Message To Print
|
|
Special Return
|
|
"""
|
|
phase = (
|
|
{
|
|
'end_trigger': '·',
|
|
'regex': re.compile('(?P<build_and_specs>.*)', flags=re.DOTALL | re.MULTILINE),
|
|
'function': clear_results
|
|
},
|
|
{
|
|
'end_trigger': 'Core:',
|
|
'regex': re.compile('Path: (?:(?P<win_path>\\w:/)|(?P<lin_path>/)).*?\n'
|
|
'(?:.* custom config: (?P<custom_config>.*?)\n.*?)?',
|
|
flags=re.DOTALL | re.MULTILINE),
|
|
'function': [get_id, piracy_check]
|
|
},
|
|
{
|
|
'end_trigger': 'VFS:',
|
|
'regex': re.compile('Decoder: (?P<ppu_decoder>.*?)\n.*?'
|
|
'Threads: (?P<ppu_threads>.*?)\n.*?'
|
|
'(?:scheduler: (?P<thread_scheduler>.*?)\n.*?)?'
|
|
'Decoder: (?P<spu_decoder>.*?)\n.*?'
|
|
'(?:secondary cores: (?P<spu_secondary_cores>.*?)\n.*?)?'
|
|
'priority: (?P<spu_lower_thread_priority>.*?)\n.*?'
|
|
'SPU Threads: (?P<spu_threads>.*?)\n.*?'
|
|
'penalty: (?P<spu_delay_penalty>.*?)\n.*?'
|
|
'detection: (?P<spu_loop_detection>.*?)\n.*?'
|
|
'[Ll]oader: (?P<lib_loader>.*?)\n.*?'
|
|
'functions: (?P<hook_static_functions>.*?)\n.*',
|
|
flags=re.DOTALL | re.MULTILINE),
|
|
'function': get_libraries
|
|
},
|
|
{
|
|
'end_trigger': 'Video:',
|
|
'regex': None,
|
|
'function': None
|
|
},
|
|
{
|
|
'end_trigger': 'Audio:',
|
|
'regex': re.compile('Renderer: (?P<renderer>.*?)\n.*?'
|
|
'Resolution: (?P<resolution>.*?)\n.*?'
|
|
'Aspect ratio: (?P<aspect_ratio>.*?)\n.*?'
|
|
'Frame limit: (?P<frame_limit>.*?)\n.*?'
|
|
'Write Color Buffers: (?P<write_color_buffers>.*?)\n.*?'
|
|
'VSync: (?P<vsync>.*?)\n.*?'
|
|
'Use GPU texture scaling: (?P<gpu_texture_scaling>.*?)\n.*?'
|
|
'Strict Rendering Mode: (?P<strict_rendering_mode>.*?)\n.*?'
|
|
'Disable Vertex Cache: (?P<vertex_cache>.*?)\n.*?'
|
|
'Blit: (?P<cpu_blit>.*?)\n.*?'
|
|
'Resolution Scale: (?P<resolution_scale>.*?)\n.*?'
|
|
'Anisotropic Filter Override: (?P<af_override>.*?)\n.*?'
|
|
'Minimum Scalable Dimension: (?P<texture_scale_threshold>.*?)\n.*?'
|
|
'(?:D3D12|DirectX 12):\\s*\n\\s*Adapter: (?P<d3d_gpu>.*?)\n.*?'
|
|
'Vulkan:\\s*\n\\s*Adapter: (?P<vulkan_gpu>.*?)\n.*?',
|
|
flags=re.DOTALL | re.MULTILINE)
|
|
},
|
|
{
|
|
'end_trigger': 'Log:',
|
|
'regex': None,
|
|
'function': done
|
|
},
|
|
{
|
|
'end_trigger': 'Objects cleared...',
|
|
'regex': re.compile('(?:'
|
|
'RSX:(?:\\d|\\.|\\s|\\w|-)* (?P<driver_version>(?:\\d+\\.)*\\d+)\n[^\n]*?'
|
|
'RSX: [^\n]+\n[^\n]*?'
|
|
'RSX: (?P<driver_manuf>.*?)\n[^\n]*?'
|
|
'RSX: Supported texel buffer size reported: (?P<texel_buffer_size>\\d*?) bytes'
|
|
')|(?:'
|
|
'GL RENDERER: (?P<driver_manuf_new>.*?)\n[^\n]*?'
|
|
'GL VERSION:(?:\\d|\\.|\\s|\\w|-)* (?P<driver_version_new>(?:\\d+\\.)*\\d+)\n[^\n]*?'
|
|
'RSX: [^\n]+\n[^\n]*?'
|
|
'RSX: Supported texel buffer size reported: (?P<texel_buffer_size_new>\\d*?) bytes'
|
|
')\n.*?',
|
|
flags=re.DOTALL | re.MULTILINE),
|
|
'on_buffer_flush': parse_rsx,
|
|
'function': done_and_reset
|
|
}
|
|
)
|
|
|
|
def __init__(self):
|
|
self.buffer = ''
|
|
self.buffer_lines = deque([])
|
|
self.total_data_len = 0
|
|
self.phase_index = 0
|
|
self.build_and_specs = None
|
|
self.trigger = None
|
|
self.trigger_context = None
|
|
self.libraries = []
|
|
self.parsed_data = {}
|
|
|
|
def feed(self, data):
|
|
self.total_data_len += len(data)
|
|
if self.total_data_len > 32 * 1024 * 1024:
|
|
return self.ERROR_OVERFLOW
|
|
if self.phase[self.phase_index]['end_trigger'] in data \
|
|
or self.phase[self.phase_index]['end_trigger'] is data.strip():
|
|
error_code = self.process_data()
|
|
self.buffer = ''
|
|
self.buffer_lines.clear()
|
|
if error_code == self.ERROR_SUCCESS or error_code == self.ERROR_STOP:
|
|
self.phase_index += 1
|
|
if error_code != self.ERROR_SUCCESS:
|
|
self.sanitize()
|
|
return error_code
|
|
else:
|
|
if len(self.buffer_lines) > 256:
|
|
error_code = self.process_data(True)
|
|
if error_code != self.ERROR_SUCCESS:
|
|
self.sanitize()
|
|
return error_code
|
|
self.buffer_lines.popleft()
|
|
self.buffer_lines.append(data)
|
|
return self.ERROR_SUCCESS
|
|
|
|
def process_data(self, on_buffer_flush = False):
|
|
current_phase = self.phase[self.phase_index]
|
|
# if buffer was flushed and check function found relevant data, run regex
|
|
if on_buffer_flush:
|
|
try:
|
|
if current_phase['on_buffer_flush'] is not None:
|
|
if not current_phase['on_buffer_flush'](self):
|
|
return self.ERROR_SUCCESS
|
|
except KeyError:
|
|
pass
|
|
self.buffer = '\n'.join(self.buffer_lines)
|
|
if current_phase['regex'] is not None:
|
|
try:
|
|
regex_result = re.search(current_phase['regex'], self.buffer.strip() + '\n')
|
|
if regex_result is not None:
|
|
group_args = regex_result.groupdict()
|
|
self.parsed_data.update(group_args)
|
|
except AttributeError as ae:
|
|
print(ae)
|
|
print("Regex failed!")
|
|
return self.ERROR_FAIL
|
|
try:
|
|
# run funcitons only on end_trigger
|
|
if current_phase['function'] is not None and not on_buffer_flush:
|
|
if isinstance(current_phase['function'], list):
|
|
for func in current_phase['function']:
|
|
error_code = func(self)
|
|
if error_code != self.ERROR_SUCCESS:
|
|
return error_code
|
|
return self.ERROR_SUCCESS
|
|
else:
|
|
return current_phase['function'](self)
|
|
except KeyError:
|
|
pass
|
|
return self.ERROR_SUCCESS
|
|
|
|
def sanitize(self):
|
|
result = {}
|
|
for k, v in self.parsed_data.items():
|
|
r = sanitize_string(v)
|
|
if r is not None:
|
|
if r == "true":
|
|
r = "[x]"
|
|
elif r == "false":
|
|
r = "[ ]"
|
|
result[k] = r
|
|
self.parsed_data = result
|
|
libs = []
|
|
for l in self.libraries:
|
|
libs.append(sanitize_string(l))
|
|
self.libraries = libs
|
|
if self.trigger is not None:
|
|
self.trigger = sanitize_string(self.trigger)
|
|
if self.trigger_context is not None:
|
|
self.trigger_context = sanitize_string(trim_string(self.trigger_context, 256))
|
|
|
|
def get_trigger(self):
|
|
return self.trigger
|
|
|
|
def get_trigger_context(self):
|
|
return self.trigger_context
|
|
|
|
def process_final_data(self):
|
|
group_args = self.parsed_data
|
|
if 'strict_rendering_mode' in group_args and group_args['strict_rendering_mode'] == 'true':
|
|
group_args['resolution_scale'] = "Strict Mode"
|
|
if 'spu_threads' in group_args and group_args['spu_threads'] == '0':
|
|
group_args['spu_threads'] = 'Auto'
|
|
if 'spu_secondary_cores' in group_args and group_args['spu_secondary_cores'] is not None:
|
|
group_args['thread_scheduler'] = group_args['spu_secondary_cores']
|
|
if 'driver_manuf_new' in group_args and group_args['driver_manuf_new'] is not None:
|
|
group_args['gpu_info'] = group_args['driver_manuf_new']
|
|
elif 'vulkan_gpu' in group_args and group_args['vulkan_gpu'] != '""':
|
|
group_args['gpu_info'] = group_args['vulkan_gpu']
|
|
elif 'd3d_gpu' in group_args and group_args['d3d_gpu'] != '""':
|
|
group_args['gpu_info'] = group_args['d3d_gpu']
|
|
elif 'driver_manuf' in group_args and group_args['driver_manuf'] is not None:
|
|
group_args['gpu_info'] = group_args['driver_manuf']
|
|
else:
|
|
group_args['gpu_info'] = 'Unknown'
|
|
if 'driver_version_new' in group_args and group_args["driver_version_new"] is not None:
|
|
group_args["gpu_info"] = group_args["gpu_info"] + " (" + group_args["driver_version_new"] + ")"
|
|
elif 'driver_version' in group_args and group_args["driver_version"] is not None:
|
|
group_args["gpu_info"] = group_args["gpu_info"] + " (" + group_args["driver_version"] + ")"
|
|
if 'af_override' in group_args:
|
|
if group_args['af_override'] == '0':
|
|
group_args['af_override'] = 'Auto'
|
|
elif group_args['af_override'] == '1':
|
|
group_args['af_override'] = 'Disabled'
|
|
|
|
def get_text_report(self):
|
|
self.process_final_data()
|
|
additional_info = {
|
|
'product_info': self.product_info.to_string(),
|
|
'libs': ', '.join(self.libraries) if len(self.libraries) > 0 and self.libraries[0] != "]" else "None",
|
|
'os': 'Windows' if 'win_path' in self.parsed_data and self.parsed_data['win_path'] is not None else 'Linux',
|
|
'config': '\nUsing custom config!\n' if 'custom_config' in self.parsed_data and self.parsed_data['custom_config'] is not None else ''
|
|
}
|
|
additional_info.update(self.parsed_data)
|
|
return (
|
|
'```'
|
|
'{product_info}\n'
|
|
'\n'
|
|
'{build_and_specs}'
|
|
'GPU: {gpu_info}\n'
|
|
'OS: {os}\n'
|
|
'{config}\n'
|
|
'PPU Decoder: {ppu_decoder:>21s} | Thread Scheduler: {thread_scheduler}\n'
|
|
'SPU Decoder: {spu_decoder:>21s} | SPU Threads: {spu_threads}\n'
|
|
'SPU Lower Thread Priority: {spu_lower_thread_priority:>7s} | Hook Static Functions: {hook_static_functions}\n'
|
|
'SPU Loop Detection: {spu_loop_detection:>14s} | Lib Loader: {lib_loader}\n'
|
|
'\n'
|
|
'Selected Libraries: {libs}\n'
|
|
'\n'
|
|
'Renderer: {renderer:>24s} | Frame Limit: {frame_limit}\n'
|
|
'Resolution: {resolution:>22s} | Write Color Buffers: {write_color_buffers}\n'
|
|
'Resolution Scale: {resolution_scale:>16s} | Use GPU texture scaling: {gpu_texture_scaling}\n'
|
|
'Resolution Scale Threshold: {texture_scale_threshold:>6s} | Anisotropic Filter Override: {af_override}\n'
|
|
'VSync: {vsync:>27s} | Disable Vertex Cache: {vertex_cache}\n'
|
|
'```'
|
|
).format(**additional_info)
|
|
|
|
def get_embed_report(self):
|
|
self.process_final_data()
|
|
lib_loader = self.parsed_data['lib_loader']
|
|
if lib_loader is not None:
|
|
lib_loader = lib_loader.lower()
|
|
lib_loader_auto = 'auto' in lib_loader
|
|
lib_loader_manual = 'manual' in lib_loader
|
|
if lib_loader_auto and lib_loader_manual:
|
|
self.parsed_data['lib_loader'] = "Auto & manual select"
|
|
elif lib_loader_auto:
|
|
self.parsed_data['lib_loader'] = "Auto"
|
|
elif lib_loader_manual:
|
|
self.parsed_data['lib_loader'] = "Manual selection"
|
|
custom_config = 'custom_config' in self.parsed_data and self.parsed_data['custom_config'] is not None
|
|
self.parsed_data['os_path'] = 'Windows' if 'win_path' in self.parsed_data and self.parsed_data['win_path'] is not None else 'Linux'
|
|
result = self.product_info.to_embed(False).add_field(
|
|
name='Build Info',
|
|
value=(
|
|
'{build_and_specs}'
|
|
'GPU: {gpu_info}'
|
|
).format(**self.parsed_data),
|
|
inline=False
|
|
).add_field(
|
|
name='CPU Settings' if not custom_config else 'Per-game CPU Settings',
|
|
value=(
|
|
'`PPU Decoder: {ppu_decoder:>21s}`\n'
|
|
'`SPU Decoder: {spu_decoder:>21s}`\n'
|
|
'`SPU Lower Thread Priority: {spu_lower_thread_priority:>7s}`\n'
|
|
'`SPU Loop Detection: {spu_loop_detection:>14s}`\n'
|
|
'`Thread Scheduler: {thread_scheduler:>16s}`\n'
|
|
'`Detected OS: {os_path:>21s}`\n'
|
|
'`SPU Threads: {spu_threads:>21s}`\n'
|
|
'`Force CPU Blit: {cpu_blit:>18s}`\n'
|
|
'`Hook Static Functions: {hook_static_functions:>11s}`\n'
|
|
'`Lib Loader: {lib_loader:>22s}`\n'
|
|
).format(**self.parsed_data),
|
|
inline=True
|
|
).add_field(
|
|
name='GPU Settings' if not custom_config else 'Per-game GPU Settings',
|
|
value=(
|
|
'`Renderer: {renderer:>24s}`\n'
|
|
'`Aspect ratio: {aspect_ratio:>20s}`\n'
|
|
'`Resolution: {resolution:>22s}`\n'
|
|
'`Resolution Scale: {resolution_scale:>16s}`\n'
|
|
'`Resolution Scale Threshold: {texture_scale_threshold:>6s}`\n'
|
|
'`Write Color Buffers: {write_color_buffers:>13s}`\n'
|
|
'`Use GPU texture scaling: {gpu_texture_scaling:>9s}`\n'
|
|
'`Anisotropic Filter: {af_override:>14s}`\n'
|
|
'`Frame Limit: {frame_limit:>21s}`\n'
|
|
# '`VSync: {vsync:>27s}`\n'
|
|
'`Disable Vertex Cache: {vertex_cache:>12s}`\n'
|
|
).format(**self.parsed_data),
|
|
inline=True
|
|
)
|
|
if 'manual' in self.parsed_data['lib_loader'].lower():
|
|
result = result.add_field(
|
|
name="Selected Libraries",
|
|
value=', '.join(self.libraries) if len(self.libraries) > 0 and self.libraries[0] != "]" else "None",
|
|
inline=False
|
|
)
|
|
return result
|