lib.frr

  1import re
  2from typing import Dict, Any
  3
  4from SRE.lib_sre import Grade0
  5
  6
  7def get_ospf_interfaces(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, Dict[str, Any]]:
  8    """Run 'vtysh -c "show ip ospf interface"' and parse the output.
  9
 10    Returns a dict mapping interface name -> dict of parsed fields:
 11      state            : 'up' | 'down'
 12      internet_address : str  e.g. '10.150.180.224/28'
 13      broadcast        : str  e.g. '10.150.180.239'
 14      area             : str  e.g. '0.0.0.0'
 15      router_id        : str
 16      network_type     : str  e.g. 'BROADCAST'
 17      cost             : int
 18      ospf_state       : str  e.g. 'Backup', 'DR', 'DROther'
 19      priority         : int
 20      hello_interval   : int  (seconds)
 21      dead_interval    : int  (seconds)
 22      retransmit_interval : int (seconds)
 23      neighbor_count          : int
 24      adjacent_neighbor_count : int
 25      dr_id            : str | None
 26      dr_address       : str | None
 27      bdr_id           : str | None
 28      bdr_address      : str | None
 29
 30    Returns {} on error or empty output.
 31    """
 32    output, code = grade.test(machine_name, 'vtysh -c "show ip ospf interface"', step=step)
 33    if code != 0 or not output:
 34        return {}
 35
 36    result: Dict[str, Dict[str, Any]] = {}
 37    current: Dict[str, Any] | None = None
 38
 39    for line in output.splitlines():
 40        # Interface header: "ethX is up"
 41        m = re.match(r'^(\S+) is (up|down)', line)
 42        if m:
 43            current = {
 44                'state': m.group(2),
 45                'internet_address': None, 'broadcast': None, 'area': None,
 46                'router_id': None, 'network_type': None, 'cost': None,
 47                'ospf_state': None, 'priority': None,
 48                'hello_interval': None, 'dead_interval': None, 'retransmit_interval': None,
 49                'neighbor_count': None, 'adjacent_neighbor_count': None,
 50                'dr_id': None, 'dr_address': None,
 51                'bdr_id': None, 'bdr_address': None,
 52            }
 53            result[m.group(1)] = current
 54            continue
 55
 56        if current is None:
 57            continue
 58
 59        # Internet Address 10.150.180.224/28, Broadcast 10.150.180.239, Area 0.0.0.0
 60        m = re.search(r'Internet Address (\S+?),\s*Broadcast (\S+?),\s*Area (\S+)', line)
 61        if m:
 62            current['internet_address'] = m.group(1)
 63            current['broadcast'] = m.group(2)
 64            current['area'] = m.group(3)
 65            continue
 66
 67        # Router ID 10.150.180.224, Network Type BROADCAST, Cost: 10
 68        m = re.search(r'Router ID (\S+?),\s*Network Type (\S+?),\s*Cost:\s*(\d+)', line)
 69        if m:
 70            current['router_id'] = m.group(1)
 71            current['network_type'] = m.group(2)
 72            current['cost'] = int(m.group(3))
 73            continue
 74
 75        # Transmit Delay is 1 sec, State Backup, Priority 1
 76        m = re.search(r'State (\S+?),\s*Priority (\d+)', line)
 77        if m:
 78            current['ospf_state'] = m.group(1).rstrip(',')
 79            current['priority'] = int(m.group(2))
 80            continue
 81
 82        # Designated Router (ID) 172.23.52.228 Interface Address 10.150.180.230/28
 83        m = re.search(r'Designated Router \(ID\) (\S+)\s+Interface Address (\S+)', line)
 84        if m and 'Backup' not in line:
 85            current['dr_id'] = m.group(1)
 86            current['dr_address'] = m.group(2)
 87            continue
 88
 89        # Backup Designated Router (ID) 10.150.180.224, Interface Address 10.150.180.224
 90        m = re.search(r'Backup Designated Router \(ID\) (\S+?),\s*Interface Address (\S+)', line)
 91        if m:
 92            current['bdr_id'] = m.group(1)
 93            current['bdr_address'] = m.group(2)
 94            continue
 95
 96        # Timer intervals configured, Hello 10s, Dead 40s, Wait 40s, Retransmit 5
 97        m = re.search(r'Hello (\d+)s,\s*Dead (\d+)s,.*Retransmit (\d+)', line)
 98        if m:
 99            current['hello_interval'] = int(m.group(1))
100            current['dead_interval'] = int(m.group(2))
101            current['retransmit_interval'] = int(m.group(3))
102            continue
103
104        # Neighbor Count is 1, Adjacent neighbor count is 1
105        m = re.search(r'Neighbor Count is (\d+),\s*Adjacent neighbor count is (\d+)', line)
106        if m:
107            current['neighbor_count'] = int(m.group(1))
108            current['adjacent_neighbor_count'] = int(m.group(2))
109            continue
110
111    return result
def get_ospf_interfaces( grade: SRE.lib_sre.Grade0, machine_name: str, step: int = 1) -> Dict[str, Dict[str, Any]]:
  8def get_ospf_interfaces(grade: Grade0, machine_name: str, step: int = 1) -> Dict[str, Dict[str, Any]]:
  9    """Run 'vtysh -c "show ip ospf interface"' and parse the output.
 10
 11    Returns a dict mapping interface name -> dict of parsed fields:
 12      state            : 'up' | 'down'
 13      internet_address : str  e.g. '10.150.180.224/28'
 14      broadcast        : str  e.g. '10.150.180.239'
 15      area             : str  e.g. '0.0.0.0'
 16      router_id        : str
 17      network_type     : str  e.g. 'BROADCAST'
 18      cost             : int
 19      ospf_state       : str  e.g. 'Backup', 'DR', 'DROther'
 20      priority         : int
 21      hello_interval   : int  (seconds)
 22      dead_interval    : int  (seconds)
 23      retransmit_interval : int (seconds)
 24      neighbor_count          : int
 25      adjacent_neighbor_count : int
 26      dr_id            : str | None
 27      dr_address       : str | None
 28      bdr_id           : str | None
 29      bdr_address      : str | None
 30
 31    Returns {} on error or empty output.
 32    """
 33    output, code = grade.test(machine_name, 'vtysh -c "show ip ospf interface"', step=step)
 34    if code != 0 or not output:
 35        return {}
 36
 37    result: Dict[str, Dict[str, Any]] = {}
 38    current: Dict[str, Any] | None = None
 39
 40    for line in output.splitlines():
 41        # Interface header: "ethX is up"
 42        m = re.match(r'^(\S+) is (up|down)', line)
 43        if m:
 44            current = {
 45                'state': m.group(2),
 46                'internet_address': None, 'broadcast': None, 'area': None,
 47                'router_id': None, 'network_type': None, 'cost': None,
 48                'ospf_state': None, 'priority': None,
 49                'hello_interval': None, 'dead_interval': None, 'retransmit_interval': None,
 50                'neighbor_count': None, 'adjacent_neighbor_count': None,
 51                'dr_id': None, 'dr_address': None,
 52                'bdr_id': None, 'bdr_address': None,
 53            }
 54            result[m.group(1)] = current
 55            continue
 56
 57        if current is None:
 58            continue
 59
 60        # Internet Address 10.150.180.224/28, Broadcast 10.150.180.239, Area 0.0.0.0
 61        m = re.search(r'Internet Address (\S+?),\s*Broadcast (\S+?),\s*Area (\S+)', line)
 62        if m:
 63            current['internet_address'] = m.group(1)
 64            current['broadcast'] = m.group(2)
 65            current['area'] = m.group(3)
 66            continue
 67
 68        # Router ID 10.150.180.224, Network Type BROADCAST, Cost: 10
 69        m = re.search(r'Router ID (\S+?),\s*Network Type (\S+?),\s*Cost:\s*(\d+)', line)
 70        if m:
 71            current['router_id'] = m.group(1)
 72            current['network_type'] = m.group(2)
 73            current['cost'] = int(m.group(3))
 74            continue
 75
 76        # Transmit Delay is 1 sec, State Backup, Priority 1
 77        m = re.search(r'State (\S+?),\s*Priority (\d+)', line)
 78        if m:
 79            current['ospf_state'] = m.group(1).rstrip(',')
 80            current['priority'] = int(m.group(2))
 81            continue
 82
 83        # Designated Router (ID) 172.23.52.228 Interface Address 10.150.180.230/28
 84        m = re.search(r'Designated Router \(ID\) (\S+)\s+Interface Address (\S+)', line)
 85        if m and 'Backup' not in line:
 86            current['dr_id'] = m.group(1)
 87            current['dr_address'] = m.group(2)
 88            continue
 89
 90        # Backup Designated Router (ID) 10.150.180.224, Interface Address 10.150.180.224
 91        m = re.search(r'Backup Designated Router \(ID\) (\S+?),\s*Interface Address (\S+)', line)
 92        if m:
 93            current['bdr_id'] = m.group(1)
 94            current['bdr_address'] = m.group(2)
 95            continue
 96
 97        # Timer intervals configured, Hello 10s, Dead 40s, Wait 40s, Retransmit 5
 98        m = re.search(r'Hello (\d+)s,\s*Dead (\d+)s,.*Retransmit (\d+)', line)
 99        if m:
100            current['hello_interval'] = int(m.group(1))
101            current['dead_interval'] = int(m.group(2))
102            current['retransmit_interval'] = int(m.group(3))
103            continue
104
105        # Neighbor Count is 1, Adjacent neighbor count is 1
106        m = re.search(r'Neighbor Count is (\d+),\s*Adjacent neighbor count is (\d+)', line)
107        if m:
108            current['neighbor_count'] = int(m.group(1))
109            current['adjacent_neighbor_count'] = int(m.group(2))
110            continue
111
112    return result

Run 'vtysh -c "show ip ospf interface"' and parse the output.

Returns a dict mapping interface name -> dict of parsed fields: state : 'up' | 'down' internet_address : str e.g. '10.150.180.224/28' broadcast : str e.g. '10.150.180.239' area : str e.g. '0.0.0.0' router_id : str network_type : str e.g. 'BROADCAST' cost : int ospf_state : str e.g. 'Backup', 'DR', 'DROther' priority : int hello_interval : int (seconds) dead_interval : int (seconds) retransmit_interval : int (seconds) neighbor_count : int adjacent_neighbor_count : int dr_id : str | None dr_address : str | None bdr_id : str | None bdr_address : str | None

Returns {} on error or empty output.