mirror of
https://gitee.com/openharmony/third_party_littlefs
synced 2024-11-23 06:50:37 +00:00
9bb47943d7
Signed-off-by: JKANG94 <446326515@qq.com>
1003 lines
30 KiB
Python
1003 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Display operations on block devices based on trace output
|
|
#
|
|
# Example:
|
|
# ./scripts/tracebd.py trace
|
|
#
|
|
# Copyright (c) 2022, The littlefs authors.
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
#
|
|
|
|
import collections as co
|
|
import functools as ft
|
|
import io
|
|
import itertools as it
|
|
import math as m
|
|
import os
|
|
import re
|
|
import shutil
|
|
import threading as th
|
|
import time
|
|
|
|
|
|
CHARS = 'rpe.'
|
|
COLORS = ['42', '45', '44', '']
|
|
|
|
WEAR_CHARS = '0123456789'
|
|
WEAR_CHARS_SUBSCRIPTS = '.₁₂₃₄₅₆789'
|
|
WEAR_COLORS = ['', '', '', '', '', '', '', '35', '35', '1;31']
|
|
|
|
CHARS_DOTS = " .':"
|
|
COLORS_DOTS = ['32', '35', '34', '']
|
|
CHARS_BRAILLE = (
|
|
'⠀⢀⡀⣀⠠⢠⡠⣠⠄⢄⡄⣄⠤⢤⡤⣤' '⠐⢐⡐⣐⠰⢰⡰⣰⠔⢔⡔⣔⠴⢴⡴⣴'
|
|
'⠂⢂⡂⣂⠢⢢⡢⣢⠆⢆⡆⣆⠦⢦⡦⣦' '⠒⢒⡒⣒⠲⢲⡲⣲⠖⢖⡖⣖⠶⢶⡶⣶'
|
|
'⠈⢈⡈⣈⠨⢨⡨⣨⠌⢌⡌⣌⠬⢬⡬⣬' '⠘⢘⡘⣘⠸⢸⡸⣸⠜⢜⡜⣜⠼⢼⡼⣼'
|
|
'⠊⢊⡊⣊⠪⢪⡪⣪⠎⢎⡎⣎⠮⢮⡮⣮' '⠚⢚⡚⣚⠺⢺⡺⣺⠞⢞⡞⣞⠾⢾⡾⣾'
|
|
'⠁⢁⡁⣁⠡⢡⡡⣡⠅⢅⡅⣅⠥⢥⡥⣥' '⠑⢑⡑⣑⠱⢱⡱⣱⠕⢕⡕⣕⠵⢵⡵⣵'
|
|
'⠃⢃⡃⣃⠣⢣⡣⣣⠇⢇⡇⣇⠧⢧⡧⣧' '⠓⢓⡓⣓⠳⢳⡳⣳⠗⢗⡗⣗⠷⢷⡷⣷'
|
|
'⠉⢉⡉⣉⠩⢩⡩⣩⠍⢍⡍⣍⠭⢭⡭⣭' '⠙⢙⡙⣙⠹⢹⡹⣹⠝⢝⡝⣝⠽⢽⡽⣽'
|
|
'⠋⢋⡋⣋⠫⢫⡫⣫⠏⢏⡏⣏⠯⢯⡯⣯' '⠛⢛⡛⣛⠻⢻⡻⣻⠟⢟⡟⣟⠿⢿⡿⣿')
|
|
|
|
|
|
def openio(path, mode='r', buffering=-1):
|
|
# allow '-' for stdin/stdout
|
|
if path == '-':
|
|
if mode == 'r':
|
|
return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
|
|
else:
|
|
return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
|
|
else:
|
|
return open(path, mode, buffering)
|
|
|
|
class LinesIO:
|
|
def __init__(self, maxlen=None):
|
|
self.maxlen = maxlen
|
|
self.lines = co.deque(maxlen=maxlen)
|
|
self.tail = io.StringIO()
|
|
|
|
# trigger automatic sizing
|
|
if maxlen == 0:
|
|
self.resize(0)
|
|
|
|
def write(self, s):
|
|
# note using split here ensures the trailing string has no newline
|
|
lines = s.split('\n')
|
|
|
|
if len(lines) > 1 and self.tail.getvalue():
|
|
self.tail.write(lines[0])
|
|
lines[0] = self.tail.getvalue()
|
|
self.tail = io.StringIO()
|
|
|
|
self.lines.extend(lines[:-1])
|
|
|
|
if lines[-1]:
|
|
self.tail.write(lines[-1])
|
|
|
|
def resize(self, maxlen):
|
|
self.maxlen = maxlen
|
|
if maxlen == 0:
|
|
maxlen = shutil.get_terminal_size((80, 5))[1]
|
|
if maxlen != self.lines.maxlen:
|
|
self.lines = co.deque(self.lines, maxlen=maxlen)
|
|
|
|
canvas_lines = 1
|
|
def draw(self):
|
|
# did terminal size change?
|
|
if self.maxlen == 0:
|
|
self.resize(0)
|
|
|
|
# first thing first, give ourself a canvas
|
|
while LinesIO.canvas_lines < len(self.lines):
|
|
sys.stdout.write('\n')
|
|
LinesIO.canvas_lines += 1
|
|
|
|
# clear the bottom of the canvas if we shrink
|
|
shrink = LinesIO.canvas_lines - len(self.lines)
|
|
if shrink > 0:
|
|
for i in range(shrink):
|
|
sys.stdout.write('\r')
|
|
if shrink-1-i > 0:
|
|
sys.stdout.write('\x1b[%dA' % (shrink-1-i))
|
|
sys.stdout.write('\x1b[K')
|
|
if shrink-1-i > 0:
|
|
sys.stdout.write('\x1b[%dB' % (shrink-1-i))
|
|
sys.stdout.write('\x1b[%dA' % shrink)
|
|
LinesIO.canvas_lines = len(self.lines)
|
|
|
|
for i, line in enumerate(self.lines):
|
|
# move cursor, clear line, disable/reenable line wrapping
|
|
sys.stdout.write('\r')
|
|
if len(self.lines)-1-i > 0:
|
|
sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i))
|
|
sys.stdout.write('\x1b[K')
|
|
sys.stdout.write('\x1b[?7l')
|
|
sys.stdout.write(line)
|
|
sys.stdout.write('\x1b[?7h')
|
|
if len(self.lines)-1-i > 0:
|
|
sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i))
|
|
sys.stdout.flush()
|
|
|
|
|
|
# space filling Hilbert-curve
|
|
#
|
|
# note we memoize the last curve since this is a bit expensive
|
|
#
|
|
@ft.lru_cache(1)
|
|
def hilbert_curve(width, height):
|
|
# based on generalized Hilbert curves:
|
|
# https://github.com/jakubcerveny/gilbert
|
|
#
|
|
def hilbert_(x, y, a_x, a_y, b_x, b_y):
|
|
w = abs(a_x+a_y)
|
|
h = abs(b_x+b_y)
|
|
a_dx = -1 if a_x < 0 else +1 if a_x > 0 else 0
|
|
a_dy = -1 if a_y < 0 else +1 if a_y > 0 else 0
|
|
b_dx = -1 if b_x < 0 else +1 if b_x > 0 else 0
|
|
b_dy = -1 if b_y < 0 else +1 if b_y > 0 else 0
|
|
|
|
# trivial row
|
|
if h == 1:
|
|
for _ in range(w):
|
|
yield (x,y)
|
|
x, y = x+a_dx, y+a_dy
|
|
return
|
|
|
|
# trivial column
|
|
if w == 1:
|
|
for _ in range(h):
|
|
yield (x,y)
|
|
x, y = x+b_dx, y+b_dy
|
|
return
|
|
|
|
a_x_, a_y_ = a_x//2, a_y//2
|
|
b_x_, b_y_ = b_x//2, b_y//2
|
|
w_ = abs(a_x_+a_y_)
|
|
h_ = abs(b_x_+b_y_)
|
|
|
|
if 2*w > 3*h:
|
|
# prefer even steps
|
|
if w_ % 2 != 0 and w > 2:
|
|
a_x_, a_y_ = a_x_+a_dx, a_y_+a_dy
|
|
|
|
# split in two
|
|
yield from hilbert_(x, y, a_x_, a_y_, b_x, b_y)
|
|
yield from hilbert_(x+a_x_, y+a_y_, a_x-a_x_, a_y-a_y_, b_x, b_y)
|
|
else:
|
|
# prefer even steps
|
|
if h_ % 2 != 0 and h > 2:
|
|
b_x_, b_y_ = b_x_+b_dx, b_y_+b_dy
|
|
|
|
# split in three
|
|
yield from hilbert_(x, y, b_x_, b_y_, a_x_, a_y_)
|
|
yield from hilbert_(x+b_x_, y+b_y_, a_x, a_y, b_x-b_x_, b_y-b_y_)
|
|
yield from hilbert_(
|
|
x+(a_x-a_dx)+(b_x_-b_dx), y+(a_y-a_dy)+(b_y_-b_dy),
|
|
-b_x_, -b_y_, -(a_x-a_x_), -(a_y-a_y_))
|
|
|
|
if width >= height:
|
|
curve = hilbert_(0, 0, +width, 0, 0, +height)
|
|
else:
|
|
curve = hilbert_(0, 0, 0, +height, +width, 0)
|
|
|
|
return list(curve)
|
|
|
|
# space filling Z-curve/Lebesgue-curve
|
|
#
|
|
# note we memoize the last curve since this is a bit expensive
|
|
#
|
|
@ft.lru_cache(1)
|
|
def lebesgue_curve(width, height):
|
|
# we create a truncated Z-curve by simply filtering out the points
|
|
# that are outside our region
|
|
curve = []
|
|
for i in range(2**(2*m.ceil(m.log2(max(width, height))))):
|
|
# we just operate on binary strings here because it's easier
|
|
b = '{:0{}b}'.format(i, 2*m.ceil(m.log2(i+1)/2))
|
|
x = int(b[1::2], 2) if b[1::2] else 0
|
|
y = int(b[0::2], 2) if b[0::2] else 0
|
|
if x < width and y < height:
|
|
curve.append((x, y))
|
|
|
|
return curve
|
|
|
|
|
|
class Block(int):
|
|
__slots__ = ()
|
|
def __new__(cls, state=0, *,
|
|
wear=0,
|
|
readed=False,
|
|
proged=False,
|
|
erased=False):
|
|
return super().__new__(cls,
|
|
state
|
|
| (wear << 3)
|
|
| (1 if readed else 0)
|
|
| (2 if proged else 0)
|
|
| (4 if erased else 0))
|
|
|
|
@property
|
|
def wear(self):
|
|
return self >> 3
|
|
|
|
@property
|
|
def readed(self):
|
|
return (self & 1) != 0
|
|
|
|
@property
|
|
def proged(self):
|
|
return (self & 2) != 0
|
|
|
|
@property
|
|
def erased(self):
|
|
return (self & 4) != 0
|
|
|
|
def read(self):
|
|
return Block(int(self) | 1)
|
|
|
|
def prog(self):
|
|
return Block(int(self) | 2)
|
|
|
|
def erase(self):
|
|
return Block((int(self) | 4) + 8)
|
|
|
|
def clear(self):
|
|
return Block(int(self) & ~7)
|
|
|
|
def __or__(self, other):
|
|
return Block(
|
|
(int(self) | int(other)) & 7,
|
|
wear=max(self.wear, other.wear))
|
|
|
|
def worn(self, max_wear, *,
|
|
block_cycles=None,
|
|
wear_chars=None,
|
|
**_):
|
|
if wear_chars is None:
|
|
wear_chars = WEAR_CHARS
|
|
|
|
if block_cycles:
|
|
return self.wear / block_cycles
|
|
else:
|
|
return self.wear / max(max_wear, len(wear_chars))
|
|
|
|
def draw(self, max_wear, char=None, *,
|
|
read=True,
|
|
prog=True,
|
|
erase=True,
|
|
wear=False,
|
|
block_cycles=None,
|
|
color=True,
|
|
subscripts=False,
|
|
dots=False,
|
|
braille=False,
|
|
chars=None,
|
|
wear_chars=None,
|
|
colors=None,
|
|
wear_colors=None,
|
|
**_):
|
|
# fallback to default chars/colors
|
|
if chars is None:
|
|
chars = CHARS
|
|
if len(chars) < len(CHARS):
|
|
chars = chars + CHARS[len(chars):]
|
|
|
|
if colors is None:
|
|
if braille or dots:
|
|
colors = COLORS_DOTS
|
|
else:
|
|
colors = COLORS
|
|
if len(colors) < len(COLORS):
|
|
colors = colors + COLORS[len(colors):]
|
|
|
|
if wear_chars is None:
|
|
if subscripts:
|
|
wear_chars = WEAR_CHARS_SUBSCRIPTS
|
|
else:
|
|
wear_chars = WEAR_CHARS
|
|
|
|
if wear_colors is None:
|
|
wear_colors = WEAR_COLORS
|
|
|
|
# compute char/color
|
|
c = chars[3]
|
|
f = [colors[3]]
|
|
|
|
if wear:
|
|
w = min(
|
|
self.worn(
|
|
max_wear,
|
|
block_cycles=block_cycles,
|
|
wear_chars=wear_chars),
|
|
1)
|
|
|
|
c = wear_chars[int(w * (len(wear_chars)-1))]
|
|
f.append(wear_colors[int(w * (len(wear_colors)-1))])
|
|
|
|
if erase and self.erased:
|
|
c = chars[2]
|
|
f.append(colors[2])
|
|
elif prog and self.proged:
|
|
c = chars[1]
|
|
f.append(colors[1])
|
|
elif read and self.readed:
|
|
c = chars[0]
|
|
f.append(colors[0])
|
|
|
|
# override char?
|
|
if char:
|
|
c = char
|
|
|
|
# apply colors
|
|
if f and color:
|
|
c = '%s%s\x1b[m' % (
|
|
''.join('\x1b[%sm' % f_ for f_ in f),
|
|
c)
|
|
|
|
return c
|
|
|
|
|
|
class Bd:
|
|
def __init__(self, *,
|
|
size=1,
|
|
count=1,
|
|
width=None,
|
|
height=1,
|
|
blocks=None):
|
|
if width is None:
|
|
width = count
|
|
|
|
if blocks is None:
|
|
self.blocks = [Block() for _ in range(width*height)]
|
|
else:
|
|
self.blocks = blocks
|
|
self.size = size
|
|
self.count = count
|
|
self.width = width
|
|
self.height = height
|
|
|
|
def _op(self, f, block=None, off=None, size=None):
|
|
if block is None:
|
|
range_ = range(len(self.blocks))
|
|
else:
|
|
if off is None:
|
|
off, size = 0, self.size
|
|
elif size is None:
|
|
off, size = 0, off
|
|
|
|
# update our geometry? this will do nothing if we haven't changed
|
|
self.resize(
|
|
size=max(self.size, off+size),
|
|
count=max(self.count, block+1))
|
|
|
|
# map to our block space
|
|
start = (block*self.size + off) / (self.size*self.count)
|
|
stop = (block*self.size + off+size) / (self.size*self.count)
|
|
|
|
range_ = range(
|
|
m.floor(start*len(self.blocks)),
|
|
m.ceil(stop*len(self.blocks)))
|
|
|
|
# apply the op
|
|
for i in range_:
|
|
self.blocks[i] = f(self.blocks[i])
|
|
|
|
def read(self, block=None, off=None, size=None):
|
|
self._op(Block.read, block, off, size)
|
|
|
|
def prog(self, block=None, off=None, size=None):
|
|
self._op(Block.prog, block, off, size)
|
|
|
|
def erase(self, block=None, off=None, size=None):
|
|
self._op(Block.erase, block, off, size)
|
|
|
|
def clear(self, block=None, off=None, size=None):
|
|
self._op(Block.clear, block, off, size)
|
|
|
|
def copy(self):
|
|
return Bd(
|
|
blocks=self.blocks.copy(),
|
|
size=self.size,
|
|
count=self.count,
|
|
width=self.width,
|
|
height=self.height)
|
|
|
|
def resize(self, *,
|
|
size=None,
|
|
count=None,
|
|
width=None,
|
|
height=None):
|
|
size = size if size is not None else self.size
|
|
count = count if count is not None else self.count
|
|
width = width if width is not None else self.width
|
|
height = height if height is not None else self.height
|
|
|
|
if (size == self.size
|
|
and count == self.count
|
|
and width == self.width
|
|
and height == self.height):
|
|
return
|
|
|
|
# transform our blocks
|
|
blocks = []
|
|
for x in range(width*height):
|
|
# map from new bd space
|
|
start = m.floor(x * (size*count)/(width*height))
|
|
stop = m.ceil((x+1) * (size*count)/(width*height))
|
|
start_block = start // size
|
|
start_off = start % size
|
|
stop_block = stop // size
|
|
stop_off = stop % size
|
|
# map to old bd space
|
|
start = start_block*self.size + start_off
|
|
stop = stop_block*self.size + stop_off
|
|
start = m.floor(start * len(self.blocks)/(self.size*self.count))
|
|
stop = m.ceil(stop * len(self.blocks)/(self.size*self.count))
|
|
|
|
# aggregate state
|
|
blocks.append(ft.reduce(
|
|
Block.__or__,
|
|
self.blocks[start:stop],
|
|
Block()))
|
|
|
|
self.size = size
|
|
self.count = count
|
|
self.width = width
|
|
self.height = height
|
|
self.blocks = blocks
|
|
|
|
def draw(self, row, *,
|
|
read=False,
|
|
prog=False,
|
|
erase=False,
|
|
wear=False,
|
|
hilbert=False,
|
|
lebesgue=False,
|
|
dots=False,
|
|
braille=False,
|
|
**args):
|
|
# find max wear?
|
|
max_wear = None
|
|
if wear:
|
|
max_wear = max(b.wear for b in self.blocks)
|
|
|
|
# fold via a curve?
|
|
if hilbert:
|
|
grid = [None]*(self.width*self.height)
|
|
for (x,y), b in zip(
|
|
hilbert_curve(self.width, self.height),
|
|
self.blocks):
|
|
grid[x + y*self.width] = b
|
|
elif lebesgue:
|
|
grid = [None]*(self.width*self.height)
|
|
for (x,y), b in zip(
|
|
lebesgue_curve(self.width, self.height),
|
|
self.blocks):
|
|
grid[x + y*self.width] = b
|
|
else:
|
|
grid = self.blocks
|
|
|
|
# need to wait for more trace output before rendering
|
|
#
|
|
# this is sort of a hack that knows the output is going to a terminal
|
|
if (braille and self.height < 4) or (dots and self.height < 2):
|
|
needed_height = 4 if braille else 2
|
|
|
|
self.history = getattr(self, 'history', [])
|
|
self.history.append(grid)
|
|
|
|
if len(self.history)*self.height < needed_height:
|
|
# skip for now
|
|
return None
|
|
|
|
grid = list(it.chain.from_iterable(
|
|
# did we resize?
|
|
it.islice(it.chain(h, it.repeat(Block())),
|
|
self.width*self.height)
|
|
for h in self.history))
|
|
self.history = []
|
|
|
|
line = []
|
|
if braille:
|
|
# encode into a byte
|
|
for x in range(0, self.width, 2):
|
|
byte_b = 0
|
|
best_b = Block()
|
|
for i in range(2*4):
|
|
b = grid[x+(2-1-(i%2)) + ((row*4)+(4-1-(i//2)))*self.width]
|
|
best_b |= b
|
|
if ((read and b.readed)
|
|
or (prog and b.proged)
|
|
or (erase and b.erased)
|
|
or (not read and not prog and not erase
|
|
and wear and b.worn(max_wear, **args) >= 0.7)):
|
|
byte_b |= 1 << i
|
|
|
|
line.append(best_b.draw(
|
|
max_wear,
|
|
CHARS_BRAILLE[byte_b],
|
|
braille=True,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
**args))
|
|
elif dots:
|
|
# encode into a byte
|
|
for x in range(self.width):
|
|
byte_b = 0
|
|
best_b = Block()
|
|
for i in range(2):
|
|
b = grid[x + ((row*2)+(2-1-i))*self.width]
|
|
best_b |= b
|
|
if ((read and b.readed)
|
|
or (prog and b.proged)
|
|
or (erase and b.erased)
|
|
or (not read and not prog and not erase
|
|
and wear and b.worn(max_wear, **args) >= 0.7)):
|
|
byte_b |= 1 << i
|
|
|
|
line.append(best_b.draw(
|
|
max_wear,
|
|
CHARS_DOTS[byte_b],
|
|
dots=True,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
**args))
|
|
else:
|
|
for x in range(self.width):
|
|
line.append(grid[x + row*self.width].draw(
|
|
max_wear,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
**args))
|
|
|
|
return ''.join(line)
|
|
|
|
|
|
|
|
def main(path='-', *,
|
|
read=False,
|
|
prog=False,
|
|
erase=False,
|
|
wear=False,
|
|
block=(None,None),
|
|
off=(None,None),
|
|
block_size=None,
|
|
block_count=None,
|
|
block_cycles=None,
|
|
reset=False,
|
|
color='auto',
|
|
dots=False,
|
|
braille=False,
|
|
width=None,
|
|
height=None,
|
|
lines=None,
|
|
cat=False,
|
|
hilbert=False,
|
|
lebesgue=False,
|
|
coalesce=None,
|
|
sleep=None,
|
|
keep_open=False,
|
|
**args):
|
|
# figure out what color should be
|
|
if color == 'auto':
|
|
color = sys.stdout.isatty()
|
|
elif color == 'always':
|
|
color = True
|
|
else:
|
|
color = False
|
|
|
|
# exclusive wear or read/prog/erase by default
|
|
if not read and not prog and not erase and not wear:
|
|
read = True
|
|
prog = True
|
|
erase = True
|
|
|
|
# assume a reasonable lines/height if not specified
|
|
#
|
|
# note that we let height = None if neither hilbert or lebesgue
|
|
# are specified, this is a bit special as the default may be less
|
|
# than one character in height.
|
|
if height is None and (hilbert or lebesgue):
|
|
if lines is not None:
|
|
height = lines
|
|
else:
|
|
height = 5
|
|
|
|
if lines is None:
|
|
if height is not None:
|
|
lines = height
|
|
else:
|
|
lines = 5
|
|
|
|
# allow ranges for blocks/offs
|
|
block_start = block[0]
|
|
block_stop = block[1] if len(block) > 1 else block[0]+1
|
|
off_start = off[0]
|
|
off_stop = off[1] if len(off) > 1 else off[0]+1
|
|
|
|
if block_start is None:
|
|
block_start = 0
|
|
if block_stop is None and block_count is not None:
|
|
block_stop = block_count
|
|
if off_start is None:
|
|
off_start = 0
|
|
if off_stop is None and block_size is not None:
|
|
off_stop = block_size
|
|
|
|
# create a block device representation
|
|
bd = Bd()
|
|
|
|
def resize(*, size=None, count=None):
|
|
nonlocal bd
|
|
|
|
# size may be overriden by cli args
|
|
if block_size is not None:
|
|
size = block_size
|
|
elif off_stop is not None:
|
|
size = off_stop-off_start
|
|
|
|
if block_count is not None:
|
|
count = block_count
|
|
elif block_stop is not None:
|
|
count = block_stop-block_start
|
|
|
|
# figure out best width/height
|
|
if width is None:
|
|
width_ = min(80, shutil.get_terminal_size((80, 5))[0])
|
|
elif width:
|
|
width_ = width
|
|
else:
|
|
width_ = shutil.get_terminal_size((80, 5))[0]
|
|
|
|
if height is None:
|
|
height_ = 0
|
|
elif height:
|
|
height_ = height
|
|
else:
|
|
height_ = shutil.get_terminal_size((80, 5))[1]
|
|
|
|
bd.resize(
|
|
size=size,
|
|
count=count,
|
|
# scale if we're printing with dots or braille
|
|
width=2*width_ if braille else width_,
|
|
height=max(1,
|
|
4*height_ if braille
|
|
else 2*height_ if dots
|
|
else height_))
|
|
resize()
|
|
|
|
# parse a line of trace output
|
|
pattern = re.compile(
|
|
'^(?P<file>[^:]*):(?P<line>[0-9]+):trace:.*?bd_(?:'
|
|
'(?P<create>create\w*)\('
|
|
'(?:'
|
|
'block_size=(?P<block_size>\w+)'
|
|
'|' 'block_count=(?P<block_count>\w+)'
|
|
'|' '.*?' ')*' '\)'
|
|
'|' '(?P<read>read)\('
|
|
'\s*(?P<read_ctx>\w+)' '\s*,'
|
|
'\s*(?P<read_block>\w+)' '\s*,'
|
|
'\s*(?P<read_off>\w+)' '\s*,'
|
|
'\s*(?P<read_buffer>\w+)' '\s*,'
|
|
'\s*(?P<read_size>\w+)' '\s*\)'
|
|
'|' '(?P<prog>prog)\('
|
|
'\s*(?P<prog_ctx>\w+)' '\s*,'
|
|
'\s*(?P<prog_block>\w+)' '\s*,'
|
|
'\s*(?P<prog_off>\w+)' '\s*,'
|
|
'\s*(?P<prog_buffer>\w+)' '\s*,'
|
|
'\s*(?P<prog_size>\w+)' '\s*\)'
|
|
'|' '(?P<erase>erase)\('
|
|
'\s*(?P<erase_ctx>\w+)' '\s*,'
|
|
'\s*(?P<erase_block>\w+)'
|
|
'\s*\(\s*(?P<erase_size>\w+)\s*\)' '\s*\)'
|
|
'|' '(?P<sync>sync)\('
|
|
'\s*(?P<sync_ctx>\w+)' '\s*\)' ')\s*$')
|
|
def parse(line):
|
|
nonlocal bd
|
|
|
|
# string searching is much faster than the regex here, and this
|
|
# actually has a big impact given how much trace output comes
|
|
# through here
|
|
if 'trace' not in line or 'bd' not in line:
|
|
return False
|
|
m = pattern.match(line)
|
|
if not m:
|
|
return False
|
|
|
|
if m.group('create'):
|
|
# update our block size/count
|
|
size = int(m.group('block_size'), 0)
|
|
count = int(m.group('block_count'), 0)
|
|
|
|
resize(size=size, count=count)
|
|
if reset:
|
|
bd = Bd(
|
|
size=bd.size,
|
|
count=bd.count,
|
|
width=bd.width,
|
|
height=bd.height)
|
|
return True
|
|
|
|
elif m.group('read') and read:
|
|
block = int(m.group('read_block'), 0)
|
|
off = int(m.group('read_off'), 0)
|
|
size = int(m.group('read_size'), 0)
|
|
|
|
if block_stop is not None and block >= block_stop:
|
|
return False
|
|
block -= block_start
|
|
if off_stop is not None:
|
|
if off >= off_stop:
|
|
return False
|
|
size = min(size, off_stop-off)
|
|
off -= off_start
|
|
|
|
bd.read(block, off, size)
|
|
return True
|
|
|
|
elif m.group('prog') and prog:
|
|
block = int(m.group('prog_block'), 0)
|
|
off = int(m.group('prog_off'), 0)
|
|
size = int(m.group('prog_size'), 0)
|
|
|
|
if block_stop is not None and block >= block_stop:
|
|
return False
|
|
block -= block_start
|
|
if off_stop is not None:
|
|
if off >= off_stop:
|
|
return False
|
|
size = min(size, off_stop-off)
|
|
off -= off_start
|
|
|
|
bd.prog(block, off, size)
|
|
return True
|
|
|
|
elif m.group('erase') and (erase or wear):
|
|
block = int(m.group('erase_block'), 0)
|
|
size = int(m.group('erase_size'), 0)
|
|
|
|
if block_stop is not None and block >= block_stop:
|
|
return False
|
|
block -= block_start
|
|
if off_stop is not None:
|
|
size = min(size, off_stop)
|
|
off = -off_start
|
|
|
|
bd.erase(block, off, size)
|
|
return True
|
|
|
|
else:
|
|
return False
|
|
|
|
# print trace output
|
|
def draw(f):
|
|
def writeln(s=''):
|
|
f.write(s)
|
|
f.write('\n')
|
|
f.writeln = writeln
|
|
|
|
# don't forget we've scaled this for braille/dots!
|
|
for row in range(
|
|
m.ceil(bd.height/4) if braille
|
|
else m.ceil(bd.height/2) if dots
|
|
else bd.height):
|
|
line = bd.draw(row,
|
|
read=read,
|
|
prog=prog,
|
|
erase=erase,
|
|
wear=wear,
|
|
block_cycles=block_cycles,
|
|
color=color,
|
|
dots=dots,
|
|
braille=braille,
|
|
hilbert=hilbert,
|
|
lebesgue=lebesgue,
|
|
**args)
|
|
if line:
|
|
f.writeln(line)
|
|
|
|
bd.clear()
|
|
resize()
|
|
|
|
|
|
# read/parse/coalesce operations
|
|
if cat:
|
|
ring = sys.stdout
|
|
else:
|
|
ring = LinesIO(lines)
|
|
|
|
# if sleep print in background thread to avoid getting stuck in a read call
|
|
event = th.Event()
|
|
lock = th.Lock()
|
|
if sleep:
|
|
done = False
|
|
def background():
|
|
while not done:
|
|
event.wait()
|
|
event.clear()
|
|
with lock:
|
|
draw(ring)
|
|
if not cat:
|
|
ring.draw()
|
|
time.sleep(sleep or 0.01)
|
|
th.Thread(target=background, daemon=True).start()
|
|
|
|
try:
|
|
while True:
|
|
with openio(path) as f:
|
|
changed = 0
|
|
for line in f:
|
|
with lock:
|
|
changed += parse(line)
|
|
|
|
# need to redraw?
|
|
if changed and (not coalesce or changed >= coalesce):
|
|
if sleep:
|
|
event.set()
|
|
else:
|
|
draw(ring)
|
|
if not cat:
|
|
ring.draw()
|
|
changed = 0
|
|
|
|
if not keep_open:
|
|
break
|
|
# don't just flood open calls
|
|
time.sleep(sleep or 0.1)
|
|
except FileNotFoundError as e:
|
|
print("error: file not found %r" % path)
|
|
sys.exit(-1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
if sleep:
|
|
done = True
|
|
lock.acquire() # avoids https://bugs.python.org/issue42717
|
|
if not cat:
|
|
sys.stdout.write('\n')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
import argparse
|
|
parser = argparse.ArgumentParser(
|
|
description="Display operations on block devices based on "
|
|
"trace output.",
|
|
allow_abbrev=False)
|
|
parser.add_argument(
|
|
'path',
|
|
nargs='?',
|
|
help="Path to read from.")
|
|
parser.add_argument(
|
|
'-r', '--read',
|
|
action='store_true',
|
|
help="Render reads.")
|
|
parser.add_argument(
|
|
'-p', '--prog',
|
|
action='store_true',
|
|
help="Render progs.")
|
|
parser.add_argument(
|
|
'-e', '--erase',
|
|
action='store_true',
|
|
help="Render erases.")
|
|
parser.add_argument(
|
|
'-w', '--wear',
|
|
action='store_true',
|
|
help="Render wear.")
|
|
parser.add_argument(
|
|
'-b', '--block',
|
|
type=lambda x: tuple(
|
|
int(x, 0) if x.strip() else None
|
|
for x in x.split(',')),
|
|
help="Show a specific block or range of blocks.")
|
|
parser.add_argument(
|
|
'-i', '--off',
|
|
type=lambda x: tuple(
|
|
int(x, 0) if x.strip() else None
|
|
for x in x.split(',')),
|
|
help="Show a specific offset or range of offsets.")
|
|
parser.add_argument(
|
|
'-B', '--block-size',
|
|
type=lambda x: int(x, 0),
|
|
help="Assume a specific block size.")
|
|
parser.add_argument(
|
|
'--block-count',
|
|
type=lambda x: int(x, 0),
|
|
help="Assume a specific block count.")
|
|
parser.add_argument(
|
|
'-C', '--block-cycles',
|
|
type=lambda x: int(x, 0),
|
|
help="Assumed maximum number of erase cycles when measuring wear.")
|
|
parser.add_argument(
|
|
'-R', '--reset',
|
|
action='store_true',
|
|
help="Reset wear on block device initialization.")
|
|
parser.add_argument(
|
|
'--color',
|
|
choices=['never', 'always', 'auto'],
|
|
default='auto',
|
|
help="When to use terminal colors. Defaults to 'auto'.")
|
|
parser.add_argument(
|
|
'--subscripts',
|
|
action='store_true',
|
|
help="Use unicode subscripts for showing wear.")
|
|
parser.add_argument(
|
|
'-:', '--dots',
|
|
action='store_true',
|
|
help="Use 1x2 ascii dot characters.")
|
|
parser.add_argument(
|
|
'-⣿', '--braille',
|
|
action='store_true',
|
|
help="Use 2x4 unicode braille characters. Note that braille characters "
|
|
"sometimes suffer from inconsistent widths.")
|
|
parser.add_argument(
|
|
'--chars',
|
|
help="Characters to use for read, prog, erase, noop operations.")
|
|
parser.add_argument(
|
|
'--wear-chars',
|
|
help="Characters to use for showing wear.")
|
|
parser.add_argument(
|
|
'--colors',
|
|
type=lambda x: [x.strip() for x in x.split(',')],
|
|
help="Colors to use for read, prog, erase, noop operations.")
|
|
parser.add_argument(
|
|
'--wear-colors',
|
|
type=lambda x: [x.strip() for x in x.split(',')],
|
|
help="Colors to use for showing wear.")
|
|
parser.add_argument(
|
|
'-W', '--width',
|
|
nargs='?',
|
|
type=lambda x: int(x, 0),
|
|
const=0,
|
|
help="Width in columns. 0 uses the terminal width. Defaults to "
|
|
"min(terminal, 80).")
|
|
parser.add_argument(
|
|
'-H', '--height',
|
|
nargs='?',
|
|
type=lambda x: int(x, 0),
|
|
const=0,
|
|
help="Height in rows. 0 uses the terminal height. Defaults to 1.")
|
|
parser.add_argument(
|
|
'-n', '--lines',
|
|
nargs='?',
|
|
type=lambda x: int(x, 0),
|
|
const=0,
|
|
help="Show this many lines of history. 0 uses the terminal height. "
|
|
"Defaults to 5.")
|
|
parser.add_argument(
|
|
'-z', '--cat',
|
|
action='store_true',
|
|
help="Pipe directly to stdout.")
|
|
parser.add_argument(
|
|
'-U', '--hilbert',
|
|
action='store_true',
|
|
help="Render as a space-filling Hilbert curve.")
|
|
parser.add_argument(
|
|
'-Z', '--lebesgue',
|
|
action='store_true',
|
|
help="Render as a space-filling Z-curve.")
|
|
parser.add_argument(
|
|
'-c', '--coalesce',
|
|
type=lambda x: int(x, 0),
|
|
help="Number of operations to coalesce together.")
|
|
parser.add_argument(
|
|
'-s', '--sleep',
|
|
type=float,
|
|
help="Time in seconds to sleep between reads, coalescing operations.")
|
|
parser.add_argument(
|
|
'-k', '--keep-open',
|
|
action='store_true',
|
|
help="Reopen the pipe on EOF, useful when multiple "
|
|
"processes are writing.")
|
|
sys.exit(main(**{k: v
|
|
for k, v in vars(parser.parse_intermixed_args()).items()
|
|
if v is not None}))
|