mirror of
https://github.com/mitmproxy/mitmproxy.git
synced 2024-11-24 21:59:44 +00:00
Beginning of a simpler and more flexible search implementation
This commit is contained in:
parent
1913975fa6
commit
8a0404ddf8
@ -1,7 +1,7 @@
|
||||
from __future__ import absolute_import
|
||||
import os, sys, copy
|
||||
import urwid
|
||||
from . import common, grideditor, contentview, signals
|
||||
from . import common, grideditor, contentview, signals, searchable
|
||||
from .. import utils, flow, controller
|
||||
from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded
|
||||
|
||||
@ -179,7 +179,7 @@ class FlowView(urwid.WidgetWrap):
|
||||
def conn_text_merge(self, headers, msg, body):
|
||||
"""
|
||||
Grabs what is returned by conn_text_raw and merges them all
|
||||
toghether, mainly used by conn_text and search
|
||||
toghether, mainly used by conn_text
|
||||
"""
|
||||
override = self.override_get()
|
||||
viewmode = self.viewmode_get(override)
|
||||
@ -215,7 +215,7 @@ class FlowView(urwid.WidgetWrap):
|
||||
"""
|
||||
headers, msg, body = self.conn_text_raw(conn)
|
||||
merged = self.conn_text_merge(headers, msg, body)
|
||||
return urwid.ListBox(merged)
|
||||
return searchable.Searchable(merged)
|
||||
|
||||
def _tab(self, content, attr):
|
||||
p = urwid.Text(content)
|
||||
@ -251,251 +251,6 @@ class FlowView(urwid.WidgetWrap):
|
||||
)
|
||||
return f
|
||||
|
||||
def search_wrapped_around(self, last_find_line, last_search_index, backwards):
|
||||
"""
|
||||
returns true if search wrapped around the bottom.
|
||||
"""
|
||||
|
||||
current_find_line = self.state.get_flow_setting(self.flow,
|
||||
"last_find_line")
|
||||
current_search_index = self.state.get_flow_setting(self.flow,
|
||||
"last_search_index")
|
||||
|
||||
if not backwards:
|
||||
message = "search hit BOTTOM, continuing at TOP"
|
||||
if current_find_line <= last_find_line:
|
||||
return True, message
|
||||
elif current_find_line == last_find_line:
|
||||
if current_search_index <= last_search_index:
|
||||
return True, message
|
||||
else:
|
||||
message = "search hit TOP, continuing at BOTTOM"
|
||||
if current_find_line >= last_find_line:
|
||||
return True, message
|
||||
elif current_find_line == last_find_line:
|
||||
if current_search_index >= last_search_index:
|
||||
return True, message
|
||||
|
||||
return False, ""
|
||||
|
||||
def search_again(self, backwards=False):
|
||||
"""
|
||||
runs the previous search again, forwards or backwards.
|
||||
"""
|
||||
last_search_string = self.state.get_flow_setting(
|
||||
self.flow, "last_search_string"
|
||||
)
|
||||
if last_search_string:
|
||||
message = self.search(last_search_string, backwards)
|
||||
if message:
|
||||
signals.status_message.send(message=message)
|
||||
else:
|
||||
message = "no previous searches have been made"
|
||||
signals.status_message.send(message=message)
|
||||
|
||||
return message
|
||||
|
||||
def search(self, search_string, backwards=False):
|
||||
"""
|
||||
similar to view_response or view_request, but instead of just
|
||||
displaying the conn, it highlights a word that the user is
|
||||
searching for and handles all the logic surrounding that.
|
||||
"""
|
||||
|
||||
if not search_string:
|
||||
search_string = self.state.get_flow_setting(self.flow,
|
||||
"last_search_string")
|
||||
if not search_string:
|
||||
return
|
||||
|
||||
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
||||
text = self.flow.request
|
||||
const = common.VIEW_FLOW_REQUEST
|
||||
else:
|
||||
text = self.flow.response
|
||||
const = common.VIEW_FLOW_RESPONSE
|
||||
if not self.flow.response:
|
||||
return "no response to search in"
|
||||
|
||||
last_find_line = self.state.get_flow_setting(self.flow,
|
||||
"last_find_line")
|
||||
last_search_index = self.state.get_flow_setting(self.flow,
|
||||
"last_search_index")
|
||||
|
||||
# generate the body, highlight the words and get focus
|
||||
headers, msg, body = self.conn_text_raw(text)
|
||||
try:
|
||||
body, focus_position = self.search_highlight_text(
|
||||
body,
|
||||
search_string,
|
||||
backwards=backwards
|
||||
)
|
||||
except SearchError:
|
||||
return "Search not supported in this view."
|
||||
|
||||
if focus_position == None:
|
||||
# no results found.
|
||||
return "no matches for '%s'" % search_string
|
||||
|
||||
# UI stuff.
|
||||
merged = self.conn_text_merge(headers, msg, body)
|
||||
list_box = urwid.ListBox(merged)
|
||||
list_box.set_focus(focus_position + 2)
|
||||
self._w = self.wrap_body(const, list_box)
|
||||
signals.update_settings.send(self)
|
||||
|
||||
self.last_displayed_body = list_box
|
||||
|
||||
wrapped, wrapped_message = self.search_wrapped_around(
|
||||
last_find_line,
|
||||
last_search_index,
|
||||
backwards
|
||||
)
|
||||
|
||||
if wrapped:
|
||||
return wrapped_message
|
||||
|
||||
def search_get_start(self, search_string):
|
||||
start_line = 0
|
||||
start_index = 0
|
||||
last_search_string = self.state.get_flow_setting(
|
||||
self.flow,
|
||||
"last_search_string"
|
||||
)
|
||||
if search_string == last_search_string:
|
||||
start_line = self.state.get_flow_setting(
|
||||
self.flow,
|
||||
"last_find_line"
|
||||
)
|
||||
start_index = self.state.get_flow_setting(self.flow,
|
||||
"last_search_index")
|
||||
|
||||
if start_index == None:
|
||||
start_index = 0
|
||||
else:
|
||||
start_index += len(search_string)
|
||||
|
||||
if start_line == None:
|
||||
start_line = 0
|
||||
|
||||
else:
|
||||
self.state.add_flow_setting(self.flow, "last_search_string",
|
||||
search_string)
|
||||
|
||||
return (start_line, start_index)
|
||||
|
||||
def search_get_range(self, len_text_objects, start_line, backwards):
|
||||
if not backwards:
|
||||
loop_range = xrange(start_line, len_text_objects)
|
||||
else:
|
||||
loop_range = xrange(start_line, -1, -1)
|
||||
|
||||
return loop_range
|
||||
|
||||
def search_find(self, text, search_string, start_index, backwards):
|
||||
if backwards == False:
|
||||
find_index = text.find(search_string, start_index)
|
||||
else:
|
||||
if start_index != 0:
|
||||
start_index -= len(search_string)
|
||||
else:
|
||||
start_index = None
|
||||
|
||||
find_index = text.rfind(search_string, 0, start_index)
|
||||
|
||||
return find_index
|
||||
|
||||
def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False):
|
||||
start_line, start_index = self.search_get_start(search_string)
|
||||
i = start_line
|
||||
|
||||
found = False
|
||||
text_objects = copy.deepcopy(text_objects)
|
||||
loop_range = self.search_get_range(
|
||||
len(text_objects),
|
||||
start_line, backwards
|
||||
)
|
||||
for i in loop_range:
|
||||
text_object = text_objects[i]
|
||||
|
||||
try:
|
||||
text, style = text_object.get_text()
|
||||
except AttributeError:
|
||||
raise SearchError()
|
||||
|
||||
if i != start_line:
|
||||
start_index = 0
|
||||
|
||||
find_index = self.search_find(
|
||||
text,
|
||||
search_string,
|
||||
start_index,
|
||||
backwards
|
||||
)
|
||||
|
||||
if find_index != -1:
|
||||
new_text = self.search_highlight_object(
|
||||
text,
|
||||
find_index,
|
||||
search_string
|
||||
)
|
||||
text_objects[i] = new_text
|
||||
|
||||
found = True
|
||||
self.state.add_flow_setting(self.flow, "last_search_index",
|
||||
find_index)
|
||||
self.state.add_flow_setting(self.flow, "last_find_line", i)
|
||||
|
||||
break
|
||||
|
||||
# handle search WRAP
|
||||
if found:
|
||||
focus_pos = i
|
||||
else :
|
||||
if looping:
|
||||
focus_pos = None
|
||||
else:
|
||||
if not backwards:
|
||||
self.state.add_flow_setting(
|
||||
self.flow, "last_search_index", 0
|
||||
)
|
||||
self.state.add_flow_setting(
|
||||
self.flow, "last_find_line", 0
|
||||
)
|
||||
else:
|
||||
self.state.add_flow_setting(
|
||||
self.flow, "last_search_index", None
|
||||
)
|
||||
self.state.add_flow_setting(
|
||||
self.flow, "last_find_line", len(text_objects) - 1
|
||||
)
|
||||
|
||||
text_objects, focus_pos = self.search_highlight_text(
|
||||
text_objects,
|
||||
search_string,
|
||||
looping=True,
|
||||
backwards=backwards
|
||||
)
|
||||
|
||||
return text_objects, focus_pos
|
||||
|
||||
def search_highlight_object(self, text_object, find_index, search_string):
|
||||
"""
|
||||
just a little abstraction
|
||||
"""
|
||||
before = text_object[:find_index]
|
||||
after = text_object[find_index+len(search_string):]
|
||||
|
||||
new_text = urwid.Text(
|
||||
[
|
||||
before,
|
||||
(self.highlight_color, search_string),
|
||||
after,
|
||||
]
|
||||
)
|
||||
|
||||
return new_text
|
||||
|
||||
def view_request(self):
|
||||
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
|
||||
body = self.conn_text(self.flow.request)
|
||||
@ -506,7 +261,7 @@ class FlowView(urwid.WidgetWrap):
|
||||
if self.flow.response:
|
||||
body = self.conn_text(self.flow.response)
|
||||
else:
|
||||
body = urwid.ListBox(
|
||||
body = searchable.Searchable(
|
||||
[
|
||||
urwid.Text(""),
|
||||
urwid.Text(
|
||||
@ -889,23 +644,8 @@ class FlowView(urwid.WidgetWrap):
|
||||
args = (conn,)
|
||||
)
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
elif key == "/":
|
||||
last_search_string = self.state.get_flow_setting(
|
||||
self.flow,
|
||||
"last_search_string"
|
||||
)
|
||||
search_prompt = "Search body ["+last_search_string+"]" if last_search_string else "Search body"
|
||||
signals.status_prompt.send(
|
||||
prompt = search_prompt,
|
||||
text = "",
|
||||
callback = self.search
|
||||
)
|
||||
elif key == "n":
|
||||
self.search_again(backwards=False)
|
||||
elif key == "N":
|
||||
self.search_again(backwards=True)
|
||||
else:
|
||||
return key
|
||||
return super(self.__class__, self).keypress(size, key)
|
||||
|
||||
def encode_callback(self, key, conn):
|
||||
encoding_map = {
|
||||
|
83
libmproxy/console/searchable.py
Normal file
83
libmproxy/console/searchable.py
Normal file
@ -0,0 +1,83 @@
|
||||
import urwid
|
||||
|
||||
from . import signals
|
||||
|
||||
|
||||
class Highlight(urwid.AttrMap):
|
||||
def __init__(self, t):
|
||||
urwid.AttrMap.__init__(
|
||||
self,
|
||||
urwid.Text(t.text),
|
||||
"focusfield",
|
||||
)
|
||||
self.backup = t
|
||||
|
||||
|
||||
class Searchable(urwid.ListBox):
|
||||
def __init__(self, contents):
|
||||
urwid.ListBox.__init__(
|
||||
self,
|
||||
urwid.SimpleFocusListWalker(contents)
|
||||
)
|
||||
|
||||
self.search_offset = 0
|
||||
self.current_highlight = None
|
||||
self.search_term = None
|
||||
|
||||
def keypress(self, size, key):
|
||||
if key == "/":
|
||||
signals.status_prompt.send(
|
||||
prompt = "Search for",
|
||||
text = self.search_term or "",
|
||||
callback = self.set_search
|
||||
)
|
||||
if key == "n":
|
||||
self.find_next(False)
|
||||
if key == "N":
|
||||
self.find_next(True)
|
||||
else:
|
||||
return super(self.__class__, self).keypress(size, key)
|
||||
|
||||
def set_search(self, text):
|
||||
self.search_term = text or None
|
||||
self.find_next(False)
|
||||
|
||||
def set_highlight(self, offset):
|
||||
if self.current_highlight is not None:
|
||||
old = self.body[self.current_highlight]
|
||||
self.body[self.current_highlight] = old.backup
|
||||
if offset is None:
|
||||
self.current_highlight = None
|
||||
else:
|
||||
self.body[offset] = Highlight(self.body[offset])
|
||||
self.current_highlight = offset
|
||||
|
||||
def get_text(self, w):
|
||||
if isinstance(w, urwid.Text):
|
||||
return w.text
|
||||
elif isinstance(w, Highlight):
|
||||
return w.backup.text
|
||||
else:
|
||||
return None
|
||||
|
||||
def find_next(self, backwards):
|
||||
if not self.search_term:
|
||||
self.set_highlight(None)
|
||||
return
|
||||
# Start search at focus + 1
|
||||
if backwards:
|
||||
rng = xrange(len(self.body)-1, -1, -1)
|
||||
else:
|
||||
rng = xrange(1, len(self.body))
|
||||
for i in rng:
|
||||
off = (self.focus_position + i)%len(self.body)
|
||||
w = self.body[off]
|
||||
txt = self.get_text(w)
|
||||
if txt and self.search_term in txt:
|
||||
self.set_highlight(off)
|
||||
self.set_focus(off, coming_from="above")
|
||||
self.body._modified()
|
||||
return
|
||||
else:
|
||||
self.set_highlight(None)
|
||||
signals.status_message.send(message="Search not found.", expire=1)
|
Loading…
Reference in New Issue
Block a user