mirror of
https://github.com/mitmproxy/mitmproxy.git
synced 2024-11-25 06:09:50 +00:00
Refactor status bar prompting to use signal system
This commit is contained in:
parent
381a563067
commit
89383e9c13
@ -272,7 +272,7 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
self.eventlog = not self.eventlog
|
||||
self.view_flowlist()
|
||||
|
||||
def _readflow(self, paths):
|
||||
def _readflows(self, path):
|
||||
"""
|
||||
Utitility function that reads a list of flows
|
||||
or prints an error to the UI if that fails.
|
||||
@ -281,7 +281,7 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
- a list of flows, otherwise.
|
||||
"""
|
||||
try:
|
||||
return flow.read_flows_from_paths(paths)
|
||||
return flow.read_flows_from_paths([path])
|
||||
except flow.FlowReadError as e:
|
||||
if not self.statusbar:
|
||||
print >> sys.stderr, e.strerror
|
||||
@ -291,12 +291,12 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
return None
|
||||
|
||||
def client_playback_path(self, path):
|
||||
flows = self._readflow(path)
|
||||
flows = self._readflows(path)
|
||||
if flows:
|
||||
self.start_client_playback(flows, False)
|
||||
|
||||
def server_playback_path(self, path):
|
||||
flows = self._readflow(path)
|
||||
flows = self._readflows(path)
|
||||
if flows:
|
||||
self.start_server_playback(
|
||||
flows,
|
||||
@ -387,7 +387,6 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
self.header = None
|
||||
self.body = None
|
||||
self.help_context = None
|
||||
self.prompting = False
|
||||
self.onekey = False
|
||||
self.loop = urwid.MainLoop(
|
||||
self.view,
|
||||
@ -538,55 +537,6 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
self.sync_list_view()
|
||||
return reterr
|
||||
|
||||
def path_prompt(self, prompt, text, callback, *args):
|
||||
self.statusbar.path_prompt(prompt, text)
|
||||
self.view.set_focus("footer")
|
||||
self.prompting = (callback, args)
|
||||
|
||||
def prompt(self, prompt, text, callback, *args):
|
||||
self.statusbar.prompt(prompt, text)
|
||||
self.view.set_focus("footer")
|
||||
self.prompting = (callback, args)
|
||||
|
||||
def prompt_edit(self, prompt, text, callback):
|
||||
self.statusbar.prompt(prompt + ": ", text)
|
||||
self.view.set_focus("footer")
|
||||
self.prompting = (callback, [])
|
||||
|
||||
def prompt_onekey(self, prompt, keys, callback, *args):
|
||||
"""
|
||||
Keys are a set of (word, key) tuples. The appropriate key in the
|
||||
word is highlighted.
|
||||
"""
|
||||
prompt = [prompt, " ("]
|
||||
mkup = []
|
||||
for i, e in enumerate(keys):
|
||||
mkup.extend(common.highlight_key(e[0], e[1]))
|
||||
if i < len(keys)-1:
|
||||
mkup.append(",")
|
||||
prompt.extend(mkup)
|
||||
prompt.append(")? ")
|
||||
self.onekey = set(i[1] for i in keys)
|
||||
self.prompt(prompt, "", callback, *args)
|
||||
|
||||
def prompt_done(self):
|
||||
self.prompting = False
|
||||
self.onekey = False
|
||||
self.view.set_focus("body")
|
||||
signals.status_message.send(message="")
|
||||
|
||||
def prompt_execute(self, txt=None):
|
||||
if not txt:
|
||||
txt = self.statusbar.get_edit_text()
|
||||
p, args = self.prompting
|
||||
self.prompt_done()
|
||||
msg = p(txt, *args)
|
||||
if msg:
|
||||
signals.status_message.send(message=msg, expire=1)
|
||||
|
||||
def prompt_cancel(self):
|
||||
self.prompt_done()
|
||||
|
||||
def accept_all(self):
|
||||
self.state.accept_all(self)
|
||||
|
||||
|
@ -203,13 +203,11 @@ def save_data(path, data, master, state):
|
||||
|
||||
|
||||
def ask_save_path(prompt, data, master, state):
|
||||
master.path_prompt(
|
||||
prompt,
|
||||
state.last_saveload,
|
||||
save_data,
|
||||
data,
|
||||
master,
|
||||
state
|
||||
signals.status_path_prompt.send(
|
||||
prompt = prompt,
|
||||
text = state.last_saveload,
|
||||
callback = save_data,
|
||||
args = (data, master, state)
|
||||
)
|
||||
|
||||
|
||||
@ -263,14 +261,13 @@ def copy_flow(part, scope, flow, master, state):
|
||||
def save(k):
|
||||
if k == "y":
|
||||
ask_save_path("Save data: ", data, master, state)
|
||||
|
||||
master.prompt_onekey(
|
||||
"Cannot copy binary data to clipboard. Save as file?",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Cannot copy binary data to clipboard. Save as file?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
save
|
||||
callback = save
|
||||
)
|
||||
|
||||
|
||||
@ -282,14 +279,11 @@ def ask_copy_part(scope, flow, master, state):
|
||||
if scope != "s":
|
||||
choices.append(("url", "u"))
|
||||
|
||||
master.prompt_onekey(
|
||||
"Copy",
|
||||
choices,
|
||||
copy_flow,
|
||||
scope,
|
||||
flow,
|
||||
master,
|
||||
state
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Copy",
|
||||
keys = choices,
|
||||
callback = copy_flow,
|
||||
args = (scope, flow, master, state)
|
||||
)
|
||||
|
||||
|
||||
@ -306,16 +300,14 @@ def ask_save_body(part, master, state, flow):
|
||||
# We first need to determine whether we want to save the request or the
|
||||
# response content.
|
||||
if request_has_content and response_has_content:
|
||||
master.prompt_onekey(
|
||||
"Save",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Save",
|
||||
keys = (
|
||||
("request", "q"),
|
||||
("response", "s"),
|
||||
),
|
||||
ask_save_body,
|
||||
master,
|
||||
state,
|
||||
flow
|
||||
callback = ask_save_body,
|
||||
args = (master, state, flow)
|
||||
)
|
||||
elif response_has_content:
|
||||
ask_save_body("s", master, state, flow)
|
||||
|
@ -111,17 +111,17 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
|
||||
def save_flows_prompt(self, k):
|
||||
if k == "a":
|
||||
self.master.path_prompt(
|
||||
"Save all flows to: ",
|
||||
self.state.last_saveload,
|
||||
self.master.save_flows
|
||||
signals.status_path_prompt.send(
|
||||
prompt = "Save all flows to: ",
|
||||
text = self.state.last_saveload,
|
||||
callback = self.master.save_flows
|
||||
)
|
||||
else:
|
||||
self.master.path_prompt(
|
||||
"Save this flow to: ",
|
||||
self.state.last_saveload,
|
||||
self.master.save_one_flow,
|
||||
self.flow
|
||||
signals.status_path_prompt.send(
|
||||
prompt = "Save this flow to: ",
|
||||
text = self.state.last_saveload,
|
||||
callback = self.master.save_one_flow,
|
||||
args = (self.flow,)
|
||||
)
|
||||
|
||||
def stop_server_playback_prompt(self, a):
|
||||
@ -150,10 +150,10 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
self.master.options.replay_ignore_host
|
||||
)
|
||||
else:
|
||||
self.master.path_prompt(
|
||||
"Server replay path: ",
|
||||
self.state.last_saveload,
|
||||
self.master.server_playback_path
|
||||
signals.status_path_prompt.send(
|
||||
prompt = "Server replay path: ",
|
||||
text = self.state.last_saveload,
|
||||
callback = self.master.server_playback_path
|
||||
)
|
||||
|
||||
def keypress(self, (maxcol,), key):
|
||||
@ -175,23 +175,23 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
self.master.sync_list_view()
|
||||
elif key == "S":
|
||||
if not self.master.server_playback:
|
||||
self.master.prompt_onekey(
|
||||
"Server Replay",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Server Replay",
|
||||
keys = (
|
||||
("all flows", "a"),
|
||||
("this flow", "t"),
|
||||
("file", "f"),
|
||||
),
|
||||
self.server_replay_prompt,
|
||||
callback = self.server_replay_prompt,
|
||||
)
|
||||
else:
|
||||
self.master.prompt_onekey(
|
||||
"Stop current server replay?",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Stop current server replay?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
self.stop_server_playback_prompt,
|
||||
callback = self.stop_server_playback_prompt,
|
||||
)
|
||||
elif key == "V":
|
||||
if not self.flow.modified():
|
||||
@ -201,13 +201,14 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
self.master.sync_list_view()
|
||||
signals.status_message.send(message="Reverted.")
|
||||
elif key == "w":
|
||||
self.master.prompt_onekey(
|
||||
"Save",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Save",
|
||||
keys = (
|
||||
("all flows", "a"),
|
||||
("this flow", "t"),
|
||||
),
|
||||
self.save_flows_prompt,
|
||||
callback = self.save_flows_prompt,
|
||||
)
|
||||
elif key == "X":
|
||||
self.flow.kill(self.master)
|
||||
@ -215,11 +216,11 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
if self.flow.request:
|
||||
self.master.view_flow(self.flow)
|
||||
elif key == "|":
|
||||
self.master.path_prompt(
|
||||
"Send flow to script: ",
|
||||
self.state.last_script,
|
||||
self.master.run_script_once,
|
||||
self.flow
|
||||
signals.status_path_prompt.send(
|
||||
prompt = "Send flow to script: ",
|
||||
text = self.state.last_script,
|
||||
callback = self.master.run_script_once,
|
||||
args = (self.flow,)
|
||||
)
|
||||
elif key == "g":
|
||||
common.ask_copy_part("a", self.flow, self.master, self.state)
|
||||
@ -266,7 +267,12 @@ class FlowListBox(urwid.ListBox):
|
||||
|
||||
def get_method(self, k):
|
||||
if k == "e":
|
||||
self.master.prompt("Method:", "", self.get_method_raw)
|
||||
signals.status_prompt.send(
|
||||
self,
|
||||
prompt = "Method:",
|
||||
text = "",
|
||||
callback = self.get_method_raw
|
||||
)
|
||||
else:
|
||||
method = ""
|
||||
for i in common.METHOD_OPTIONS:
|
||||
@ -275,11 +281,11 @@ class FlowListBox(urwid.ListBox):
|
||||
self.get_url(method)
|
||||
|
||||
def get_url(self, method):
|
||||
self.master.prompt(
|
||||
"URL:",
|
||||
"http://www.example.com/",
|
||||
self.new_request,
|
||||
method
|
||||
signals.status_prompt.send(
|
||||
prompt = "URL:",
|
||||
text = "http://www.example.com/",
|
||||
callback = self.new_request,
|
||||
args = (method,)
|
||||
)
|
||||
|
||||
def new_request(self, url, method):
|
||||
@ -301,22 +307,23 @@ class FlowListBox(urwid.ListBox):
|
||||
elif key == "e":
|
||||
self.master.toggle_eventlog()
|
||||
elif key == "l":
|
||||
self.master.prompt(
|
||||
"Limit: ",
|
||||
self.master.state.limit_txt,
|
||||
self.master.set_limit
|
||||
signals.status_prompt.send(
|
||||
prompt = "Limit: ",
|
||||
text = self.master.state.limit_txt,
|
||||
callback = self.master.set_limit
|
||||
)
|
||||
elif key == "L":
|
||||
self.master.path_prompt(
|
||||
"Load flows: ",
|
||||
self.master.state.last_saveload,
|
||||
self.master.load_flows_callback
|
||||
signals.status_path_prompt.send(
|
||||
self,
|
||||
prompt = "Load flows: ",
|
||||
text = self.master.state.last_saveload,
|
||||
callback = self.master.load_flows_callback
|
||||
)
|
||||
elif key == "n":
|
||||
self.master.prompt_onekey(
|
||||
"Method",
|
||||
common.METHOD_OPTIONS,
|
||||
self.get_method
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Method",
|
||||
keys = common.METHOD_OPTIONS,
|
||||
callback = self.get_method
|
||||
)
|
||||
elif key == "F":
|
||||
self.master.toggle_follow_flows()
|
||||
@ -324,10 +331,11 @@ class FlowListBox(urwid.ListBox):
|
||||
if self.master.stream:
|
||||
self.master.stop_stream()
|
||||
else:
|
||||
self.master.path_prompt(
|
||||
"Stream flows to: ",
|
||||
self.master.state.last_saveload,
|
||||
self.master.start_stream_to_path
|
||||
signals.status_path_prompt.send(
|
||||
self,
|
||||
prompt = "Stream flows to: ",
|
||||
text = self.master.state.last_saveload,
|
||||
callback = self.master.start_stream_to_path
|
||||
)
|
||||
else:
|
||||
return urwid.ListBox.keypress(self, size, key)
|
||||
|
@ -492,7 +492,11 @@ class FlowView(urwid.WidgetWrap):
|
||||
|
||||
def edit_method(self, m):
|
||||
if m == "e":
|
||||
self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw)
|
||||
signals.status_prompt.send(
|
||||
prompt = "Method: ",
|
||||
text = self.flow.request.method,
|
||||
callback = self.set_method_raw
|
||||
)
|
||||
else:
|
||||
for i in common.METHOD_OPTIONS:
|
||||
if i[1] == m:
|
||||
@ -567,14 +571,14 @@ class FlowView(urwid.WidgetWrap):
|
||||
message.content = c.rstrip("\n")
|
||||
elif part == "f":
|
||||
if not message.get_form_urlencoded() and message.content:
|
||||
self.master.prompt_onekey(
|
||||
"Existing body is not a URL-encoded form. Clear and edit?",
|
||||
[
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Existing body is not a URL-encoded form. Clear and edit?",
|
||||
keys = [
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
],
|
||||
self.edit_form_confirm,
|
||||
message
|
||||
callback = self.edit_form_confirm,
|
||||
args = (message,)
|
||||
)
|
||||
else:
|
||||
self.edit_form(message)
|
||||
@ -587,13 +591,29 @@ class FlowView(urwid.WidgetWrap):
|
||||
elif part == "q":
|
||||
self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message))
|
||||
elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
||||
self.master.prompt_edit("URL", message.url, self.set_url)
|
||||
signals.status_prompt.send(
|
||||
prompt = "URL: ",
|
||||
text = message.url,
|
||||
callback = self.set_url
|
||||
)
|
||||
elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
||||
self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.edit_method)
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Method",
|
||||
keys = common.METHOD_OPTIONS,
|
||||
callback = self.edit_method
|
||||
)
|
||||
elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
|
||||
self.master.prompt_edit("Code", str(message.code), self.set_resp_code)
|
||||
signals.status_prompt.send(
|
||||
prompt = "Code: ",
|
||||
text = str(message.code),
|
||||
callback = self.set_resp_code
|
||||
)
|
||||
elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
|
||||
self.master.prompt_edit("Message", message.msg, self.set_resp_msg)
|
||||
signals.status_prompt.send(
|
||||
prompt = "Message: ",
|
||||
text = message.msg,
|
||||
callback = self.set_resp_msg
|
||||
)
|
||||
self.master.refresh_flow(self.flow)
|
||||
|
||||
def _view_nextprev_flow(self, np, flow):
|
||||
@ -684,9 +704,9 @@ class FlowView(urwid.WidgetWrap):
|
||||
signals.status_message.send(message="Duplicated.")
|
||||
elif key == "e":
|
||||
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
||||
self.master.prompt_onekey(
|
||||
"Edit request",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Edit request",
|
||||
keys = (
|
||||
("query", "q"),
|
||||
("path", "p"),
|
||||
("url", "u"),
|
||||
@ -695,18 +715,18 @@ class FlowView(urwid.WidgetWrap):
|
||||
("raw body", "r"),
|
||||
("method", "m"),
|
||||
),
|
||||
self.edit
|
||||
callback = self.edit
|
||||
)
|
||||
else:
|
||||
self.master.prompt_onekey(
|
||||
"Edit response",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Edit response",
|
||||
keys = (
|
||||
("code", "c"),
|
||||
("message", "m"),
|
||||
("header", "h"),
|
||||
("raw body", "r"),
|
||||
),
|
||||
self.edit
|
||||
callback = self.edit
|
||||
)
|
||||
key = None
|
||||
elif key == "f":
|
||||
@ -727,10 +747,11 @@ class FlowView(urwid.WidgetWrap):
|
||||
elif key == "m":
|
||||
p = list(contentview.view_prompts)
|
||||
p.insert(0, ("Clear", "C"))
|
||||
self.master.prompt_onekey(
|
||||
"Display mode",
|
||||
p,
|
||||
self.change_this_display_mode
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Display mode",
|
||||
keys = p,
|
||||
callback = self.change_this_display_mode
|
||||
)
|
||||
key = None
|
||||
elif key == "p":
|
||||
@ -748,11 +769,11 @@ class FlowView(urwid.WidgetWrap):
|
||||
self.master.refresh_flow(self.flow)
|
||||
signals.status_message.send(message="Reverted.")
|
||||
elif key == "W":
|
||||
self.master.path_prompt(
|
||||
"Save this flow: ",
|
||||
self.state.last_saveload,
|
||||
self.master.save_one_flow,
|
||||
self.flow
|
||||
signals.status_path_prompt.send(
|
||||
prompt = "Save this flow: ",
|
||||
text = self.state.last_saveload,
|
||||
callback = self.master.save_one_flow,
|
||||
args = (self.flow,)
|
||||
)
|
||||
elif key == "v":
|
||||
if conn and conn.content:
|
||||
@ -763,18 +784,20 @@ class FlowView(urwid.WidgetWrap):
|
||||
else:
|
||||
signals.status_message.send(message="Error! Set $EDITOR or $PAGER.")
|
||||
elif key == "|":
|
||||
self.master.path_prompt(
|
||||
"Send flow to script: ", self.state.last_script,
|
||||
self.master.run_script_once, self.flow
|
||||
signals.status_path_prompt.send(
|
||||
prompt = "Send flow to script: ",
|
||||
text = self.state.last_script,
|
||||
callback = self.master.run_script_once,
|
||||
args = (self.flow,)
|
||||
)
|
||||
elif key == "x":
|
||||
self.master.prompt_onekey(
|
||||
"Delete body",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Delete body",
|
||||
keys = (
|
||||
("completely", "c"),
|
||||
("mark as missing", "m"),
|
||||
),
|
||||
self.delete_body
|
||||
callback = self.delete_body
|
||||
)
|
||||
key = None
|
||||
elif key == "X":
|
||||
@ -787,22 +810,24 @@ class FlowView(urwid.WidgetWrap):
|
||||
if not conn.decode():
|
||||
signals.status_message.send(message="Could not decode - invalid data?")
|
||||
else:
|
||||
self.master.prompt_onekey(
|
||||
"Select encoding: ",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Select encoding: ",
|
||||
keys = (
|
||||
("gzip", "z"),
|
||||
("deflate", "d"),
|
||||
),
|
||||
self.encode_callback,
|
||||
conn
|
||||
callback = self.encode_callback,
|
||||
args = (conn,)
|
||||
)
|
||||
self.master.refresh_flow(self.flow)
|
||||
elif key == "/":
|
||||
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
|
||||
search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
|
||||
self.master.prompt(search_prompt,
|
||||
None,
|
||||
self.search)
|
||||
signals.status_prompt.send(
|
||||
prompt = search_prompt,
|
||||
text = "",
|
||||
callback = self.search
|
||||
)
|
||||
elif key == "n":
|
||||
self.search_again(backwards=False)
|
||||
elif key == "N":
|
||||
|
@ -338,11 +338,20 @@ class GridEditor(urwid.WidgetWrap):
|
||||
self.walker.delete_focus()
|
||||
elif key == "r":
|
||||
if self.walker.get_current_value() is not None:
|
||||
self.master.path_prompt("Read file: ", "", self.read_file)
|
||||
signals.status_path_prompt.send(
|
||||
self,
|
||||
prompt = "Read file: ",
|
||||
text = "",
|
||||
callback = self.read_file
|
||||
)
|
||||
elif key == "R":
|
||||
if self.walker.get_current_value() is not None:
|
||||
self.master.path_prompt(
|
||||
"Read unescaped file: ", "", self.read_file, True
|
||||
signals.status_path_prompt.send(
|
||||
self,
|
||||
prompt = "Read unescaped file: ",
|
||||
text = "",
|
||||
callback = self.read_file,
|
||||
args = (True,)
|
||||
)
|
||||
elif key == "e":
|
||||
o = self.walker.get_current_value()
|
||||
@ -431,10 +440,10 @@ class HeaderEditor(GridEditor):
|
||||
|
||||
def handle_key(self, key):
|
||||
if key == "U":
|
||||
self.master.prompt_onekey(
|
||||
"Add User-Agent header:",
|
||||
[(i[0], i[1]) for i in http_uastrings.UASTRINGS],
|
||||
self.set_user_agent,
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Add User-Agent header:",
|
||||
keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
|
||||
callback = self.set_user_agent,
|
||||
)
|
||||
return True
|
||||
|
||||
@ -500,10 +509,10 @@ class SetHeadersEditor(GridEditor):
|
||||
|
||||
def handle_key(self, key):
|
||||
if key == "U":
|
||||
self.master.prompt_onekey(
|
||||
"Add User-Agent header:",
|
||||
[(i[0], i[1]) for i in http_uastrings.UASTRINGS],
|
||||
self.set_user_agent,
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Add User-Agent header:",
|
||||
keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
|
||||
callback = self.set_user_agent,
|
||||
)
|
||||
return True
|
||||
|
||||
|
@ -1,5 +1,19 @@
|
||||
|
||||
import blinker
|
||||
|
||||
# Show a status message in the action bar
|
||||
status_message = blinker.Signal()
|
||||
|
||||
# Prompt for input
|
||||
status_prompt = blinker.Signal()
|
||||
|
||||
# Prompt for a path
|
||||
status_path_prompt = blinker.Signal()
|
||||
|
||||
# Prompt for a single keystroke
|
||||
status_prompt_onekey = blinker.Signal()
|
||||
|
||||
# Call a callback in N seconds
|
||||
call_in = blinker.Signal()
|
||||
|
||||
# Focus the body, footer or header of the main window
|
||||
focus = blinker.Signal()
|
||||
|
@ -2,7 +2,7 @@ import time
|
||||
|
||||
import urwid
|
||||
|
||||
from . import pathedit, signals
|
||||
from . import pathedit, signals, common
|
||||
from .. import utils
|
||||
|
||||
|
||||
@ -11,18 +11,12 @@ class ActionBar(urwid.WidgetWrap):
|
||||
urwid.WidgetWrap.__init__(self, None)
|
||||
self.clear()
|
||||
signals.status_message.connect(self.sig_message)
|
||||
signals.status_prompt.connect(self.sig_prompt)
|
||||
signals.status_path_prompt.connect(self.sig_path_prompt)
|
||||
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
|
||||
|
||||
def clear(self):
|
||||
self._w = urwid.Text("")
|
||||
|
||||
def selectable(self):
|
||||
return True
|
||||
|
||||
def path_prompt(self, prompt, text):
|
||||
self._w = pathedit.PathEdit(prompt, text)
|
||||
|
||||
def prompt(self, prompt, text = ""):
|
||||
self._w = urwid.Edit(prompt, text or "")
|
||||
self.prompting = False
|
||||
self.onekey = False
|
||||
|
||||
def sig_message(self, sender, message, expire=None):
|
||||
w = urwid.Text(message)
|
||||
@ -33,6 +27,72 @@ class ActionBar(urwid.WidgetWrap):
|
||||
self.clear()
|
||||
signals.call_in.send(seconds=expire, callback=cb)
|
||||
|
||||
def sig_prompt(self, sender, prompt, text, callback, args=()):
|
||||
signals.focus.send(self, section="footer")
|
||||
self._w = urwid.Edit(prompt, text or "")
|
||||
self.prompting = (callback, args)
|
||||
|
||||
def sig_path_prompt(self, sender, prompt, text, callback, args=()):
|
||||
signals.focus.send(self, section="footer")
|
||||
self._w = pathedit.PathEdit(prompt, text)
|
||||
self.prompting = (callback, args)
|
||||
|
||||
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
|
||||
"""
|
||||
Keys are a set of (word, key) tuples. The appropriate key in the
|
||||
word is highlighted.
|
||||
"""
|
||||
signals.focus.send(self, section="footer")
|
||||
prompt = [prompt, " ("]
|
||||
mkup = []
|
||||
for i, e in enumerate(keys):
|
||||
mkup.extend(common.highlight_key(e[0], e[1]))
|
||||
if i < len(keys)-1:
|
||||
mkup.append(",")
|
||||
prompt.extend(mkup)
|
||||
prompt.append(")? ")
|
||||
self.onekey = set(i[1] for i in keys)
|
||||
self._w = urwid.Edit(prompt, "")
|
||||
self.prompting = (callback, args)
|
||||
|
||||
def selectable(self):
|
||||
return True
|
||||
|
||||
def keypress(self, size, k):
|
||||
if self.prompting:
|
||||
if k == "esc":
|
||||
self.prompt_done()
|
||||
elif self.onekey:
|
||||
if k == "enter":
|
||||
self.prompt_done()
|
||||
elif k in self.onekey:
|
||||
self.prompt_execute(k)
|
||||
elif k == "enter":
|
||||
self.prompt_execute()
|
||||
else:
|
||||
if common.is_keypress(k):
|
||||
self._w.keypress(size, k)
|
||||
else:
|
||||
return k
|
||||
|
||||
def clear(self):
|
||||
self._w = urwid.Text("")
|
||||
|
||||
def prompt_done(self):
|
||||
self.prompting = False
|
||||
self.onekey = False
|
||||
signals.status_message.send(message="")
|
||||
signals.focus.send(self, section="body")
|
||||
|
||||
def prompt_execute(self, txt=None):
|
||||
if not txt:
|
||||
txt = self._w.get_edit_text()
|
||||
p, args = self.prompting
|
||||
self.prompt_done()
|
||||
msg = p(txt, *args)
|
||||
if msg:
|
||||
signals.status_message.send(message=msg, expire=1)
|
||||
|
||||
|
||||
class StatusBar(urwid.WidgetWrap):
|
||||
def __init__(self, master, helptext):
|
||||
|
@ -1,151 +1,144 @@
|
||||
import urwid
|
||||
from . import common, grideditor
|
||||
from . import common, grideditor, signals, contentview
|
||||
|
||||
class Window(urwid.Frame):
|
||||
def __init__(self, master, body, header, footer):
|
||||
urwid.Frame.__init__(self, body, header=header, footer=footer)
|
||||
self.master = master
|
||||
signals.focus.connect(self.sig_focus)
|
||||
|
||||
def sig_focus(self, sender, section):
|
||||
self.focus_position = section
|
||||
|
||||
def keypress(self, size, k):
|
||||
if self.master.prompting:
|
||||
if k == "esc":
|
||||
self.master.prompt_cancel()
|
||||
elif self.master.onekey:
|
||||
if k == "enter":
|
||||
self.master.prompt_cancel()
|
||||
elif k in self.master.onekey:
|
||||
self.master.prompt_execute(k)
|
||||
elif k == "enter":
|
||||
self.master.prompt_execute()
|
||||
k = urwid.Frame.keypress(self, self.master.loop.screen_size, k)
|
||||
if k == "?":
|
||||
self.master.view_help()
|
||||
elif k == "c":
|
||||
if not self.master.client_playback:
|
||||
signals.status_path_prompt.send(
|
||||
self,
|
||||
prompt = "Client replay: ",
|
||||
text = self.master.state.last_saveload,
|
||||
callback = self.master.client_playback_path
|
||||
)
|
||||
else:
|
||||
if common.is_keypress(k):
|
||||
urwid.Frame.keypress(self, self.master.loop.screen_size, k)
|
||||
else:
|
||||
return k
|
||||
else:
|
||||
k = urwid.Frame.keypress(self, self.master.loop.screen_size, k)
|
||||
if k == "?":
|
||||
self.master.view_help()
|
||||
elif k == "c":
|
||||
if not self.master.client_playback:
|
||||
self.master.path_prompt(
|
||||
"Client replay: ",
|
||||
self.master.state.last_saveload,
|
||||
self.master.client_playback_path
|
||||
)
|
||||
else:
|
||||
self.master.prompt_onekey(
|
||||
"Stop current client replay?",
|
||||
(
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
self.master.stop_client_playback_prompt,
|
||||
)
|
||||
elif k == "H":
|
||||
self.master.view_grideditor(
|
||||
grideditor.SetHeadersEditor(
|
||||
self.master,
|
||||
self.master.setheaders.get_specs(),
|
||||
self.master.setheaders.set
|
||||
)
|
||||
)
|
||||
elif k == "I":
|
||||
self.master.view_grideditor(
|
||||
grideditor.HostPatternEditor(
|
||||
self.master,
|
||||
[[x] for x in self.master.get_ignore_filter()],
|
||||
self.master.edit_ignore_filter
|
||||
)
|
||||
)
|
||||
elif k == "T":
|
||||
self.master.view_grideditor(
|
||||
grideditor.HostPatternEditor(
|
||||
self.master,
|
||||
[[x] for x in self.master.get_tcp_filter()],
|
||||
self.master.edit_tcp_filter
|
||||
)
|
||||
)
|
||||
elif k == "i":
|
||||
self.master.prompt(
|
||||
"Intercept filter: ",
|
||||
self.master.state.intercept_txt,
|
||||
self.master.set_intercept
|
||||
)
|
||||
elif k == "Q":
|
||||
raise urwid.ExitMainLoop
|
||||
elif k == "q":
|
||||
self.master.prompt_onekey(
|
||||
"Quit",
|
||||
(
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Stop current client replay?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
self.master.quit,
|
||||
callback = self.master.stop_client_playback_prompt,
|
||||
)
|
||||
elif k == "M":
|
||||
self.master.prompt_onekey(
|
||||
"Global default display mode",
|
||||
contentview.view_prompts,
|
||||
self.master.change_default_display_mode
|
||||
elif k == "H":
|
||||
self.master.view_grideditor(
|
||||
grideditor.SetHeadersEditor(
|
||||
self.master,
|
||||
self.master.setheaders.get_specs(),
|
||||
self.master.setheaders.set
|
||||
)
|
||||
elif k == "R":
|
||||
self.master.view_grideditor(
|
||||
grideditor.ReplaceEditor(
|
||||
self.master,
|
||||
self.master.replacehooks.get_specs(),
|
||||
self.master.replacehooks.set
|
||||
)
|
||||
)
|
||||
elif k == "I":
|
||||
self.master.view_grideditor(
|
||||
grideditor.HostPatternEditor(
|
||||
self.master,
|
||||
[[x] for x in self.master.get_ignore_filter()],
|
||||
self.master.edit_ignore_filter
|
||||
)
|
||||
elif k == "s":
|
||||
self.master.view_grideditor(
|
||||
grideditor.ScriptEditor(
|
||||
self.master,
|
||||
[[i.command] for i in self.master.scripts],
|
||||
self.master.edit_scripts
|
||||
)
|
||||
)
|
||||
elif k == "T":
|
||||
self.master.view_grideditor(
|
||||
grideditor.HostPatternEditor(
|
||||
self.master,
|
||||
[[x] for x in self.master.get_tcp_filter()],
|
||||
self.master.edit_tcp_filter
|
||||
)
|
||||
elif k == "S":
|
||||
if not self.master.server_playback:
|
||||
self.master.path_prompt(
|
||||
"Server replay path: ",
|
||||
self.master.state.last_saveload,
|
||||
self.master.server_playback_path
|
||||
)
|
||||
else:
|
||||
self.master.prompt_onekey(
|
||||
"Stop current server replay?",
|
||||
(
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
self.master.stop_server_playback_prompt,
|
||||
)
|
||||
elif k == "o":
|
||||
self.master.prompt_onekey(
|
||||
"Options",
|
||||
(
|
||||
("anticache", "a"),
|
||||
("anticomp", "c"),
|
||||
("showhost", "h"),
|
||||
("killextra", "k"),
|
||||
("norefresh", "n"),
|
||||
("no-upstream-certs", "u"),
|
||||
),
|
||||
self.master._change_options
|
||||
)
|
||||
elif k == "i":
|
||||
signals.status_prompt.send(
|
||||
self,
|
||||
prompt = "Intercept filter: ",
|
||||
text = self.master.state.intercept_txt,
|
||||
callback = self.master.set_intercept
|
||||
)
|
||||
elif k == "Q":
|
||||
raise urwid.ExitMainLoop
|
||||
elif k == "q":
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Quit",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
callback = self.master.quit,
|
||||
)
|
||||
elif k == "M":
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Global default display mode",
|
||||
keys = contentview.view_prompts,
|
||||
callback = self.master.change_default_display_mode
|
||||
)
|
||||
elif k == "R":
|
||||
self.master.view_grideditor(
|
||||
grideditor.ReplaceEditor(
|
||||
self.master,
|
||||
self.master.replacehooks.get_specs(),
|
||||
self.master.replacehooks.set
|
||||
)
|
||||
elif k == "t":
|
||||
self.master.prompt(
|
||||
"Sticky cookie filter: ",
|
||||
self.master.stickycookie_txt,
|
||||
self.master.set_stickycookie
|
||||
)
|
||||
elif k == "s":
|
||||
self.master.view_grideditor(
|
||||
grideditor.ScriptEditor(
|
||||
self.master,
|
||||
[[i.command] for i in self.master.scripts],
|
||||
self.master.edit_scripts
|
||||
)
|
||||
elif k == "u":
|
||||
self.master.prompt(
|
||||
"Sticky auth filter: ",
|
||||
self.master.stickyauth_txt,
|
||||
self.master.set_stickyauth
|
||||
)
|
||||
elif k == "S":
|
||||
if not self.master.server_playback:
|
||||
signals.status_path_prompt.send(
|
||||
self,
|
||||
prompt = "Server replay path: ",
|
||||
text = self.master.state.last_saveload,
|
||||
callback = self.master.server_playback_path
|
||||
)
|
||||
else:
|
||||
return k
|
||||
self.footer.redraw()
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Stop current server replay?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
callback = self.master.stop_server_playback_prompt,
|
||||
)
|
||||
elif k == "o":
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Options",
|
||||
keys = (
|
||||
("anticache", "a"),
|
||||
("anticomp", "c"),
|
||||
("showhost", "h"),
|
||||
("killextra", "k"),
|
||||
("norefresh", "n"),
|
||||
("no-upstream-certs", "u"),
|
||||
),
|
||||
callback = self.master._change_options
|
||||
)
|
||||
elif k == "t":
|
||||
signals.status_prompt.send(
|
||||
prompt = "Sticky cookie filter: ",
|
||||
text = self.master.stickycookie_txt,
|
||||
callback = self.master.set_stickycookie
|
||||
)
|
||||
elif k == "u":
|
||||
signals.status_prompt.send(
|
||||
prompt = "Sticky auth filter: ",
|
||||
text = self.master.stickyauth_txt,
|
||||
callback = self.master.set_stickyauth
|
||||
)
|
||||
else:
|
||||
return k
|
||||
|
@ -2,8 +2,8 @@
|
||||
|
||||
# Generate a test pattern with pathoc
|
||||
PATHOD=http://localhost:9999
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/200:p0,1:b@200b'"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/300:p0,1:b@200b'"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/400:p0,1:b@200b'"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/500:p0,1:b@200b'"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/600:p0,1:b@200b'"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/200:p0,1:b@200b':b@200b"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/300:p0,1:b@200b':b@200b"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/400:p0,1:b@200b':b@200b"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/500:p0,1:b@200b':b@200b"
|
||||
pathoc localhost:8080 "get:'$PATHOD/p/600:p0,1:b@200b':b@200b"
|
||||
|
Loading…
Reference in New Issue
Block a user