From 8a0404ddf81a61642e516dd32025ba54d7d3676f Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sun, 29 Mar 2015 14:32:36 +1300 Subject: [PATCH] Beginning of a simpler and more flexible search implementation --- libmproxy/console/flowview.py | 270 +------------------------------- libmproxy/console/searchable.py | 83 ++++++++++ 2 files changed, 88 insertions(+), 265 deletions(-) create mode 100644 libmproxy/console/searchable.py diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 1aebb0f05..a431d65f8 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import os, sys, copy import urwid -from . import common, grideditor, contentview, signals +from . import common, grideditor, contentview, signals, searchable from .. import utils, flow, controller from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded @@ -179,7 +179,7 @@ class FlowView(urwid.WidgetWrap): def conn_text_merge(self, headers, msg, body): """ Grabs what is returned by conn_text_raw and merges them all - toghether, mainly used by conn_text and search + toghether, mainly used by conn_text """ override = self.override_get() viewmode = self.viewmode_get(override) @@ -215,7 +215,7 @@ class FlowView(urwid.WidgetWrap): """ headers, msg, body = self.conn_text_raw(conn) merged = self.conn_text_merge(headers, msg, body) - return urwid.ListBox(merged) + return searchable.Searchable(merged) def _tab(self, content, attr): p = urwid.Text(content) @@ -251,251 +251,6 @@ class FlowView(urwid.WidgetWrap): ) return f - def search_wrapped_around(self, last_find_line, last_search_index, backwards): - """ - returns true if search wrapped around the bottom. - """ - - current_find_line = self.state.get_flow_setting(self.flow, - "last_find_line") - current_search_index = self.state.get_flow_setting(self.flow, - "last_search_index") - - if not backwards: - message = "search hit BOTTOM, continuing at TOP" - if current_find_line <= last_find_line: - return True, message - elif current_find_line == last_find_line: - if current_search_index <= last_search_index: - return True, message - else: - message = "search hit TOP, continuing at BOTTOM" - if current_find_line >= last_find_line: - return True, message - elif current_find_line == last_find_line: - if current_search_index >= last_search_index: - return True, message - - return False, "" - - def search_again(self, backwards=False): - """ - runs the previous search again, forwards or backwards. - """ - last_search_string = self.state.get_flow_setting( - self.flow, "last_search_string" - ) - if last_search_string: - message = self.search(last_search_string, backwards) - if message: - signals.status_message.send(message=message) - else: - message = "no previous searches have been made" - signals.status_message.send(message=message) - - return message - - def search(self, search_string, backwards=False): - """ - similar to view_response or view_request, but instead of just - displaying the conn, it highlights a word that the user is - searching for and handles all the logic surrounding that. - """ - - if not search_string: - search_string = self.state.get_flow_setting(self.flow, - "last_search_string") - if not search_string: - return - - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - text = self.flow.request - const = common.VIEW_FLOW_REQUEST - else: - text = self.flow.response - const = common.VIEW_FLOW_RESPONSE - if not self.flow.response: - return "no response to search in" - - last_find_line = self.state.get_flow_setting(self.flow, - "last_find_line") - last_search_index = self.state.get_flow_setting(self.flow, - "last_search_index") - - # generate the body, highlight the words and get focus - headers, msg, body = self.conn_text_raw(text) - try: - body, focus_position = self.search_highlight_text( - body, - search_string, - backwards=backwards - ) - except SearchError: - return "Search not supported in this view." - - if focus_position == None: - # no results found. - return "no matches for '%s'" % search_string - - # UI stuff. - merged = self.conn_text_merge(headers, msg, body) - list_box = urwid.ListBox(merged) - list_box.set_focus(focus_position + 2) - self._w = self.wrap_body(const, list_box) - signals.update_settings.send(self) - - self.last_displayed_body = list_box - - wrapped, wrapped_message = self.search_wrapped_around( - last_find_line, - last_search_index, - backwards - ) - - if wrapped: - return wrapped_message - - def search_get_start(self, search_string): - start_line = 0 - start_index = 0 - last_search_string = self.state.get_flow_setting( - self.flow, - "last_search_string" - ) - if search_string == last_search_string: - start_line = self.state.get_flow_setting( - self.flow, - "last_find_line" - ) - start_index = self.state.get_flow_setting(self.flow, - "last_search_index") - - if start_index == None: - start_index = 0 - else: - start_index += len(search_string) - - if start_line == None: - start_line = 0 - - else: - self.state.add_flow_setting(self.flow, "last_search_string", - search_string) - - return (start_line, start_index) - - def search_get_range(self, len_text_objects, start_line, backwards): - if not backwards: - loop_range = xrange(start_line, len_text_objects) - else: - loop_range = xrange(start_line, -1, -1) - - return loop_range - - def search_find(self, text, search_string, start_index, backwards): - if backwards == False: - find_index = text.find(search_string, start_index) - else: - if start_index != 0: - start_index -= len(search_string) - else: - start_index = None - - find_index = text.rfind(search_string, 0, start_index) - - return find_index - - def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False): - start_line, start_index = self.search_get_start(search_string) - i = start_line - - found = False - text_objects = copy.deepcopy(text_objects) - loop_range = self.search_get_range( - len(text_objects), - start_line, backwards - ) - for i in loop_range: - text_object = text_objects[i] - - try: - text, style = text_object.get_text() - except AttributeError: - raise SearchError() - - if i != start_line: - start_index = 0 - - find_index = self.search_find( - text, - search_string, - start_index, - backwards - ) - - if find_index != -1: - new_text = self.search_highlight_object( - text, - find_index, - search_string - ) - text_objects[i] = new_text - - found = True - self.state.add_flow_setting(self.flow, "last_search_index", - find_index) - self.state.add_flow_setting(self.flow, "last_find_line", i) - - break - - # handle search WRAP - if found: - focus_pos = i - else : - if looping: - focus_pos = None - else: - if not backwards: - self.state.add_flow_setting( - self.flow, "last_search_index", 0 - ) - self.state.add_flow_setting( - self.flow, "last_find_line", 0 - ) - else: - self.state.add_flow_setting( - self.flow, "last_search_index", None - ) - self.state.add_flow_setting( - self.flow, "last_find_line", len(text_objects) - 1 - ) - - text_objects, focus_pos = self.search_highlight_text( - text_objects, - search_string, - looping=True, - backwards=backwards - ) - - return text_objects, focus_pos - - def search_highlight_object(self, text_object, find_index, search_string): - """ - just a little abstraction - """ - before = text_object[:find_index] - after = text_object[find_index+len(search_string):] - - new_text = urwid.Text( - [ - before, - (self.highlight_color, search_string), - after, - ] - ) - - return new_text - def view_request(self): self.state.view_flow_mode = common.VIEW_FLOW_REQUEST body = self.conn_text(self.flow.request) @@ -506,7 +261,7 @@ class FlowView(urwid.WidgetWrap): if self.flow.response: body = self.conn_text(self.flow.response) else: - body = urwid.ListBox( + body = searchable.Searchable( [ urwid.Text(""), urwid.Text( @@ -889,23 +644,8 @@ class FlowView(urwid.WidgetWrap): args = (conn,) ) signals.flow_change.send(self, flow = self.flow) - elif key == "/": - last_search_string = self.state.get_flow_setting( - self.flow, - "last_search_string" - ) - search_prompt = "Search body ["+last_search_string+"]" if last_search_string else "Search body" - signals.status_prompt.send( - prompt = search_prompt, - text = "", - callback = self.search - ) - elif key == "n": - self.search_again(backwards=False) - elif key == "N": - self.search_again(backwards=True) else: - return key + return super(self.__class__, self).keypress(size, key) def encode_callback(self, key, conn): encoding_map = { diff --git a/libmproxy/console/searchable.py b/libmproxy/console/searchable.py new file mode 100644 index 000000000..dc8b0bab9 --- /dev/null +++ b/libmproxy/console/searchable.py @@ -0,0 +1,83 @@ +import urwid + +from . import signals + + +class Highlight(urwid.AttrMap): + def __init__(self, t): + urwid.AttrMap.__init__( + self, + urwid.Text(t.text), + "focusfield", + ) + self.backup = t + + +class Searchable(urwid.ListBox): + def __init__(self, contents): + urwid.ListBox.__init__( + self, + urwid.SimpleFocusListWalker(contents) + ) + + self.search_offset = 0 + self.current_highlight = None + self.search_term = None + + def keypress(self, size, key): + if key == "/": + signals.status_prompt.send( + prompt = "Search for", + text = self.search_term or "", + callback = self.set_search + ) + if key == "n": + self.find_next(False) + if key == "N": + self.find_next(True) + else: + return super(self.__class__, self).keypress(size, key) + + def set_search(self, text): + self.search_term = text or None + self.find_next(False) + + def set_highlight(self, offset): + if self.current_highlight is not None: + old = self.body[self.current_highlight] + self.body[self.current_highlight] = old.backup + if offset is None: + self.current_highlight = None + else: + self.body[offset] = Highlight(self.body[offset]) + self.current_highlight = offset + + def get_text(self, w): + if isinstance(w, urwid.Text): + return w.text + elif isinstance(w, Highlight): + return w.backup.text + else: + return None + + def find_next(self, backwards): + if not self.search_term: + self.set_highlight(None) + return + # Start search at focus + 1 + if backwards: + rng = xrange(len(self.body)-1, -1, -1) + else: + rng = xrange(1, len(self.body)) + for i in rng: + off = (self.focus_position + i)%len(self.body) + w = self.body[off] + txt = self.get_text(w) + if txt and self.search_term in txt: + self.set_highlight(off) + self.set_focus(off, coming_from="above") + self.body._modified() + return + else: + self.set_highlight(None) + signals.status_message.send(message="Search not found.", expire=1)