| #!/usr/bin/env python |
| |
| __doc__ = ''' |
| fsm-to-dot: convert FSM definitons to graph images |
| |
| Usage: |
| ./fsm-to-dot.py ~/openbsc/openbsc/src/libvlr/*.c |
| for f in *.dot ; do dot -Tpng $f > $f.png; done |
| # dot comes from 'apt-get install graphviz' |
| |
| Looks for osmo_fsm finite state machine definitions and madly parses .c files |
| to draw graphs of them. This uses wild regexes that rely on coding style etc.. |
| No proper C parsing is done here (pycparser sucked, unfortunately). |
| ''' |
| |
| import sys, re |
| |
| def err(msg): |
| sys.stderr.write(msg + '\n') |
| |
| class listdict(object): |
| def __getattr__(ld, name): |
| if name == 'add': |
| return ld.__getattribute__(name) |
| return ld.__dict__.__getattribute__(name) |
| |
| def _have(ld, name): |
| l = ld.__dict__.get(name) |
| if not l: |
| l = [] |
| ld.__dict__[name] = l |
| return l |
| |
| def add(ld, name, item): |
| l = ld._have(name) |
| l.append(item) |
| return ld |
| |
| def add_dict(ld, d): |
| for k,v in d.items(): |
| ld.add(k, v) |
| |
| def __setitem__(ld, name, val): |
| return ld.__dict__.__setitem__(name, val) |
| |
| def __getitem__(ld, name): |
| return ld.__dict__.__getitem__(name) |
| |
| def __str__(ld): |
| return ld.__dict__.__str__() |
| |
| def __repr__(ld): |
| return ld.__dict__.__repr__() |
| |
| def update(ld, other_ld): |
| for name, items in other_ld.items(): |
| ld.extend(name, items) |
| return ld |
| |
| def extend(ld, name, vals): |
| l = ld._have(name) |
| l.extend(vals) |
| return ld |
| |
| re_state_start = re.compile(r'\[([A-Z_][A-Z_0-9]*)\]') |
| re_event = re.compile(r'S\(([A-Z_][A-Z_0-9]*)\)') |
| re_action = re.compile(r'.action *= *([a-z_][a-z_0-9]*)') |
| |
| def state_starts(line): |
| m = re_state_start.search(line) |
| if m: |
| return m.group(1) |
| return None |
| |
| def in_event_starts(line): |
| return line.find('in_event_mask') >= 0 |
| |
| def out_state_starts(line): |
| return line.find('out_state_mask') >= 0 |
| |
| def states_or_events(line): |
| return re_event.findall(line) |
| |
| def parse_action(line): |
| a = re_action.findall(line) |
| if a: |
| return a[0] |
| return None |
| |
| def _common_prefix(a, b): |
| for l in reversed(range(1,len(a))): |
| aa = a[:l+1] |
| if b.startswith(aa): |
| return aa |
| return '' |
| |
| def common_prefix(strs): |
| if not strs: |
| return '' |
| p = None |
| for s in strs: |
| if p is None: |
| p = s |
| continue |
| p = _common_prefix(p, s) |
| if not p: |
| return '' |
| return p |
| |
| KIND_STATE = 'KIND_STATE' |
| KIND_FUNC = 'KIND_FUNC' |
| KIND_FSM = 'KIND_FSM' |
| BOX_SHAPES = { |
| KIND_STATE : None, |
| KIND_FUNC : 'box', |
| KIND_FSM : 'box3d', |
| } |
| |
| class Event: |
| def __init__(event, name): |
| event.name = name |
| event.short_name = name |
| |
| def __cmp__(event, other): |
| return cmp(event.name, other.name) |
| |
| class Edge: |
| def __init__(edge, to_state, event_name=None, style=None, action=None): |
| edge.to_state = to_state |
| edge.style = style |
| edge.events = [] |
| edge.actions = [] |
| edge.add_event_name(event_name) |
| edge.add_action(action) |
| |
| def add_event_name(edge, event_name): |
| if not event_name: |
| return |
| edge.add_event(Event(event_name)) |
| |
| def add_event(edge, event): |
| if not event: |
| return |
| if event in edge.events: |
| return |
| edge.events.append(event) |
| |
| def add_events(edge, events): |
| for event in events: |
| edge.add_event(event) |
| |
| def add_action(edge, action): |
| if not action or action in edge.actions: |
| return |
| edge.actions.append(action) |
| |
| def add_actions(edge, actions): |
| for action in actions: |
| edge.add_action(action) |
| |
| def event_names(edge): |
| return sorted([event.name for event in edge.events]) |
| |
| def event_labels(edge): |
| return sorted([event.short_name for event in edge.events]) |
| |
| def action_labels(edge): |
| return sorted([action + '()' for action in edge.actions]) |
| |
| def has_event_name(edge, event_name): |
| return event_name in edge.event_names() |
| |
| class State: |
| name = None |
| short_name = None |
| action = None |
| label = None |
| in_event_names = None |
| out_state_names = None |
| out_edges = None |
| kind = None |
| |
| def __init__(state): |
| state.in_event_names = [] |
| state.out_state_names = [] |
| state.out_edges = [] |
| state.kind = KIND_STATE |
| |
| def add_out_edge(state, edge): |
| for out_edge in state.out_edges: |
| if out_edge.to_state is edge.to_state: |
| if out_edge.style == edge.style: |
| out_edge.add_events(edge.events) |
| out_edge.add_actions(edge.actions) |
| return |
| # sanity |
| if out_edge.to_state.get_label() == edge.to_state.get_label(): |
| raise Exception('Two distinct states exist with identical labels.') |
| state.out_edges.append(edge) |
| |
| def get_label(state): |
| if state.label: |
| return state.label |
| l = [state.short_name] |
| if state.action: |
| if state.short_name == state.action: |
| l = [] |
| l.append(state.action + '()') |
| return r'\n'.join(l) |
| |
| def event_names(state): |
| event_names = [] |
| for out_edge in state.out_edges: |
| event_names.extend(out_edge.event_names()) |
| return event_names |
| |
| def shape_str(state): |
| shape = BOX_SHAPES.get(state.kind, None) |
| if not shape: |
| return '' |
| return ',shape=%s' % shape |
| |
| def __repr__(state): |
| return 'State(name=%r,short_name=%r,out=%d)' % (state.name, state.short_name, len(state.out_edges)) |
| |
| class Fsm: |
| def __init__(fsm, struct_name, states_struct_name, from_file=None): |
| fsm.states = [] |
| fsm.struct_name = struct_name |
| fsm.states_struct_name = states_struct_name |
| fsm.from_file = from_file |
| fsm.action_funcs = set() |
| fsm.event_names = set() |
| |
| def parse_states(fsm, src): |
| state = None |
| started = None |
| |
| IN_EVENTS = 'events' |
| OUT_STATES = 'states' |
| |
| lines = src.splitlines() |
| |
| for line in lines: |
| state_name = state_starts(line) |
| if state_name: |
| state = State() |
| fsm.states.append(state) |
| started = None |
| state.name = state_name |
| |
| if in_event_starts(line): |
| started = IN_EVENTS |
| if out_state_starts(line): |
| started = OUT_STATES |
| |
| if not state or not started: |
| continue |
| |
| tokens = states_or_events(line) |
| if started == IN_EVENTS: |
| state.in_event_names.extend(tokens) |
| elif started == OUT_STATES: |
| state.out_state_names.extend(tokens) |
| else: |
| err('ignoring: %r' % tokens) |
| |
| a = parse_action(line) |
| if a: |
| state.action = a |
| |
| |
| for state in fsm.states: |
| if state.action: |
| fsm.action_funcs.add(state.action) |
| if state.in_event_names: |
| fsm.event_names.update(state.in_event_names) |
| |
| fsm.make_states_short_names() |
| fsm.ref_out_states() |
| |
| def make_states_short_names(fsm): |
| p = common_prefix([s.name for s in fsm.states]) |
| for s in fsm.states: |
| s.short_name = s.name[len(p):] |
| return p |
| |
| def make_events_short_names(fsm): |
| p = common_prefix(fsm.event_names) |
| for state in fsm.states: |
| for edge in state.out_edges: |
| for event in edge.events: |
| event.short_name = event.name[len(p):] |
| |
| def ref_out_states(fsm): |
| for state in fsm.states: |
| for e in [Edge(fsm.find_state_by_name(n, True)) for n in state.out_state_names]: |
| state.add_out_edge(e) |
| |
| def find_state_by_name(fsm, name, strict=False): |
| for state in fsm.states: |
| if state.name == name: |
| return state |
| if strict: |
| raise Exception("State not found: %r" % name); |
| return None |
| |
| def find_state_by_action(fsm, action): |
| for state in fsm.states: |
| if state.action == action: |
| return state |
| return None |
| |
| def add_special_state(fsm, additional_states, name, in_state=None, |
| out_state=None, event_name=None, kind=KIND_FUNC, |
| state_action=None, label=None, edge_action=None): |
| additional_state = None |
| for s in additional_states: |
| if s.short_name == name: |
| additional_state = s |
| break; |
| |
| if not additional_state: |
| for s in fsm.states: |
| if s.short_name == name: |
| additional_state = s |
| break; |
| |
| if kind == KIND_FUNC and not state_action: |
| state_action = name |
| |
| if not additional_state: |
| additional_state = State() |
| additional_state.short_name = name |
| additional_state.action = state_action |
| additional_state.kind = kind |
| additional_state.label = label |
| additional_states.append(additional_state) |
| |
| if out_state: |
| additional_state.out_state_names.append(out_state.name) |
| additional_state.add_out_edge(Edge(out_state, event_name, 'dotted', |
| action=edge_action)) |
| |
| if in_state: |
| in_state.out_state_names.append(additional_state.name) |
| in_state.add_out_edge(Edge(additional_state, event_name, 'dotted', |
| action=edge_action)) |
| |
| |
| def find_event_edges(fsm, c_files): |
| # enrich state transitions between the states with event labels |
| func_to_state_transitions = listdict() |
| for c_file in c_files: |
| func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) ) |
| |
| # edges between explicit states |
| for state in fsm.states: |
| transitions = func_to_state_transitions.get(state.action) |
| if not transitions: |
| continue |
| |
| for to_state_name, event_name in transitions: |
| if not event_name: |
| continue |
| for out_edge in state.out_edges: |
| if out_edge.to_state.name == to_state_name: |
| out_edge.add_event_name(event_name) |
| |
| additional_states = [] |
| |
| |
| # functions that aren't state actions but still effect state transitions |
| for func_name, transitions in func_to_state_transitions.items(): |
| if func_name in fsm.action_funcs: |
| continue |
| for to_state_name, event_name in transitions: |
| to_state = fsm.find_state_by_name(to_state_name) |
| if not to_state: |
| continue |
| fsm.add_special_state(additional_states, func_name, None, to_state, event_name) |
| |
| |
| event_sources = c_files.find_event_sources(fsm.event_names) |
| |
| for state in fsm.states: |
| |
| for in_event_name in state.in_event_names: |
| funcs_for_in_event = event_sources.get(in_event_name) |
| if not funcs_for_in_event: |
| continue |
| |
| found = False |
| for out_edge in state.out_edges: |
| if out_edge.has_event_name(in_event_name): |
| out_edge.action = r'\n'.join([(f + '()') for f in funcs_for_in_event |
| if f != state.action]) |
| |
| # if any functions that don't belong to a state trigger events, add |
| # them to the graph as well |
| additional_funcs = [f for f in funcs_for_in_event if f not in fsm.action_funcs] |
| for af in additional_funcs: |
| fsm.add_special_state(additional_states, af, None, state, in_event_name) |
| |
| fsm.states.extend(additional_states) |
| |
| # do any existing action functions by chance call other action functions? |
| for state in fsm.states: |
| if not state.action: |
| continue |
| callers = c_files.find_callers(state.action) |
| if not callers: |
| continue |
| for other_state in fsm.states: |
| if other_state.action in callers: |
| other_state.add_out_edge(Edge(state, None, 'dotted')) |
| |
| def add_fsm_alloc(fsm, c_files): |
| |
| allocating_funcs = [] |
| for c_file in c_files: |
| allocating_funcs.extend(c_file.fsm_allocators.get(fsm.struct_name, [])) |
| |
| starting_state = None |
| if fsm.states: |
| # assume the first state starts |
| starting_state = fsm.states[0] |
| |
| additional_states = [] |
| for func_name in allocating_funcs: |
| fsm.add_special_state(additional_states, func_name, None, starting_state) |
| |
| fsm.states.extend(additional_states) |
| |
| def add_cross_fsm_links(fsm, fsms, c_files, fsm_meta): |
| for state in fsm.states: |
| if not state.action: |
| continue |
| if state.kind == KIND_FSM: |
| continue |
| callers = c_files.find_callers(state.action) |
| |
| if state.kind == KIND_FUNC: |
| callers.append(state.action) |
| |
| if not callers: |
| continue |
| |
| for caller in callers: |
| for calling_fsm in fsms: |
| if calling_fsm is fsm: |
| continue |
| calling_state = calling_fsm.find_state_by_action(caller) |
| if not calling_state: |
| continue |
| if calling_state.kind == KIND_FSM: |
| continue |
| |
| label = None |
| if state.kind == KIND_STATE: |
| label=fsm.struct_name + ': ' + state.short_name |
| edge_action = caller |
| if calling_state.action == edge_action: |
| edge_action = None |
| calling_fsm.add_special_state(calling_fsm.states, fsm.struct_name, |
| calling_state, kind=KIND_FSM, edge_action=edge_action, label=label) |
| |
| label = None |
| if calling_state.kind == KIND_STATE: |
| label=calling_fsm.struct_name + ': ' + calling_state.short_name |
| edge_action = caller |
| if state.action == edge_action: |
| edge_action = None |
| fsm.add_special_state(fsm.states, calling_fsm.struct_name, None, |
| state, kind=KIND_FSM, edge_action=edge_action, |
| label=label) |
| |
| # meta overview |
| meta_called_fsm = fsm_meta.have_state(fsm.struct_name, KIND_FSM) |
| meta_calling_fsm = fsm_meta.have_state(calling_fsm.struct_name, KIND_FSM) |
| meta_calling_fsm.add_out_edge(Edge(meta_called_fsm)) |
| |
| |
| def have_state(fsm, name, kind=KIND_STATE): |
| state = fsm.find_state_by_name(name) |
| if not state: |
| state = State() |
| state.name = name |
| state.short_name = name |
| state.kind = kind |
| fsm.states.append(state) |
| return state |
| |
| def to_dot(fsm): |
| out = ['digraph G {', 'rankdir=LR;'] |
| |
| for state in fsm.states: |
| out.append('%s [label="%s"%s]' % (state.short_name, state.get_label(), |
| state.shape_str())) |
| |
| for state in fsm.states: |
| for out_edge in state.out_edges: |
| attrs = [] |
| labels = [] |
| if out_edge.events: |
| labels.extend(out_edge.event_labels()) |
| if out_edge.actions: |
| labels.extend(out_edge.action_labels()) |
| if labels: |
| attrs.append('label="%s"' % (r'\n'.join(labels))) |
| if out_edge.style: |
| attrs.append('style=%s'% out_edge.style) |
| attrs_str = '' |
| if attrs: |
| attrs_str = ' [%s]' % (','.join(attrs)) |
| out.append('%s->%s%s' % (state.short_name, out_edge.to_state.short_name, attrs_str)) |
| |
| out.append('}\n') |
| |
| return '\n'.join(out) |
| |
| def write_dot_file(fsm): |
| dot_path = '%s.dot' % fsm.struct_name |
| f = open(dot_path, 'w') |
| f.write(fsm.to_dot()) |
| f.close() |
| print(dot_path) |
| |
| |
| re_fsm = re.compile(r'struct osmo_fsm ([a-z_][a-z_0-9]*) =') |
| re_fsm_states_struct_name = re.compile(r'\bstates = ([a-z_][a-z_0-9]*)\W*,') |
| re_fsm_states = re.compile(r'struct osmo_fsm_state ([a-z_][a-z_0-9]*)\[\] =') |
| re_func = re.compile(r'(\b[a-z_][a-z_0-9]*\b)\([^)]*\)\W*^{', re.MULTILINE) |
| re_state_trigger = re.compile(r'osmo_fsm_inst_state_chg\([^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M) |
| re_fsm_alloc = re.compile(r'osmo_fsm_inst_alloc[_child]*\(\W*&([a-z_][a-z_0-9]*),', re.M) |
| re_fsm_event_dispatch = re.compile(r'osmo_fsm_inst_dispatch\(\W*[^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M) |
| |
| class CFile(): |
| def __init__(c_file, path): |
| c_file.path = path |
| c_file.src = open(path).read() |
| c_file.funcs = {} |
| c_file.fsm_allocators = listdict() |
| |
| def extract_block(c_file, brace_open, brace_close, start): |
| pos = 0 |
| try: |
| src = c_file.src |
| block_start = src.find(brace_open, start) |
| |
| pos = block_start |
| level = 1 |
| while level > 0: |
| pos += 1 |
| if src[pos] == brace_open: |
| level += 1 |
| elif src[pos] == brace_close: |
| level -= 1 |
| |
| return src[block_start+1:pos] |
| except: |
| print("Error while trying to extract a code block from %r char pos %d" % (c_file.path, pos)) |
| print("Block start at char pos %d" % block_start) |
| try: |
| print(src[block_start - 20 : block_start + 20]) |
| print('...') |
| print(src[pos - 20 : pos + 20]) |
| except: |
| pass |
| return '' |
| |
| |
| def find_fsms(c_file): |
| fsms = [] |
| for m in re_fsm.finditer(c_file.src): |
| struct_name = m.group(1) |
| struct_def = c_file.extract_block('{', '}', m.start()) |
| states_struct_name = re_fsm_states_struct_name.findall(struct_def)[0] |
| fsm = Fsm(struct_name, states_struct_name, c_file) |
| fsms.append(fsm) |
| return fsms |
| |
| def find_fsm_states(c_file, fsms): |
| for m in re_fsm_states.finditer(c_file.src): |
| states_struct_name = m.group(1) |
| for fsm in fsms: |
| if states_struct_name == fsm.states_struct_name: |
| fsm.parse_states(c_file.extract_block('{', '}', m.start())) |
| |
| def parse_functions(c_file): |
| funcs = {} |
| for m in re_func.finditer(c_file.src): |
| name = m.group(1) |
| func_src = c_file.extract_block('{', '}', m.start()) |
| funcs[name] = func_src |
| c_file.funcs = funcs |
| c_file.find_fsm_allocators() |
| |
| def find_callers(c_file, func_name): |
| func_call = func_name + '(' |
| callers = [] |
| for func_name, src in c_file.funcs.items(): |
| if src.find(func_call) >= 0: |
| callers.append(func_name) |
| return callers |
| |
| def find_fsm_allocators(c_file): |
| c_file.fsm_allocators = listdict() |
| for func_name, src in c_file.funcs.items(): |
| for m in re_fsm_alloc.finditer(src): |
| fsm_struct_name = m.group(1) |
| c_file.fsm_allocators.add(fsm_struct_name, func_name) |
| |
| def find_state_transitions(c_file, event_names): |
| TO_STATE = 'TO_STATE' |
| EVENT = 'EVENT' |
| func_to_state_transitions = listdict() |
| |
| for func_name, src in c_file.funcs.items(): |
| found_tokens = [] |
| |
| for m in re_state_trigger.finditer(src): |
| to_state = m.group(1) |
| found_tokens.append((m.start(), TO_STATE, to_state)) |
| |
| for event in event_names: |
| re_event = re.compile(r'\b(' + event + r')\b') |
| for m in re_event.finditer(src): |
| event = m.group(1) |
| found_tokens.append((m.start(), EVENT, event)) |
| |
| found_tokens = sorted(found_tokens) |
| |
| last_event = None |
| for start, kind, name in found_tokens: |
| if kind == EVENT: |
| last_event = name |
| else: |
| func_to_state_transitions.add(func_name, (name, last_event)) |
| |
| return func_to_state_transitions |
| |
| |
| def find_event_sources(c_file, event_names): |
| c_file.event_sources = listdict() |
| for func_name, src in c_file.funcs.items(): |
| for m in re_fsm_event_dispatch.finditer(src): |
| event_name = m.group(1) |
| c_file.event_sources.add(event_name, func_name) |
| |
| class CFiles(list): |
| |
| def find_callers(c_files, func_name): |
| callers = [] |
| for c_file in c_files: |
| callers.extend(c_file.find_callers(func_name)) |
| return callers |
| |
| def find_func_to_state_transitions(c_files): |
| func_to_state_transitions = listdict() |
| for c_file in c_files: |
| func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) ) |
| return func_to_state_transitions |
| |
| def find_event_sources(c_files, event_names): |
| event_sources = listdict() |
| for c_file in c_files: |
| for event, sources in c_file.event_sources.items(): |
| if event in event_names: |
| event_sources.extend(event, sources) |
| return event_sources |
| |
| c_files = CFiles() |
| paths_seen = set() |
| for path in sys.argv[1:]: |
| if path in paths_seen: |
| continue |
| paths_seen.add(path) |
| c_file = CFile(path) |
| c_files.append(c_file) |
| |
| for c_file in c_files: |
| c_file.parse_functions() |
| |
| fsms = [] |
| for c_file in c_files: |
| fsms.extend(c_file.find_fsms()) |
| |
| for c_file in c_files: |
| c_file.find_fsm_states(fsms) |
| c_file.find_event_sources(fsms) |
| |
| for fsm in fsms: |
| fsm.find_event_edges(c_files) |
| fsm.add_fsm_alloc(c_files) |
| |
| fsm_meta = Fsm("meta", "meta") |
| for fsm in fsms: |
| fsm.add_cross_fsm_links(fsms, c_files, fsm_meta) |
| |
| for fsm in fsms: |
| fsm.make_events_short_names() |
| |
| for fsm in fsms: |
| fsm.write_dot_file() |
| |
| fsm_meta.write_dot_file() |
| |
| |
| # vim: tabstop=2 shiftwidth=2 expandtab |