Skip to content
Snippets Groups Projects
Commit d702dc30 authored by Tomas Hlavacek's avatar Tomas Hlavacek
Browse files

Omnia: sfpswitch refactor and add partial resets

Refacotr sfpswitch script in order to reuse code.
Add support for better --oneshot (set SFP, do not daemonize)
Add support for --resetled (Reset LED status, do not touch SFP mode,
do not daemonize).
parent 70d35059
No related branches found
No related tags found
No related merge requests found
......@@ -5,139 +5,192 @@ import os
import select
import time
debug = 1
sfpdet_pin = 508
sfpdis_pin = 505
sfplos_pin = 507
sfpflt_pin = 504
gpio_export = '/sys/class/gpio/export'
sfp_select = '/sys/devices/platform/soc/soc:internal-regs/f1034000.ethernet/net/eth1/phy_select'
map = { 1: 'phy-def', 0: 'phy-sfp' }
modemap = { 1: 'phy-def', 0: 'phy-sfp' }
cmd_net_res = 'ip link set down dev eth1; /etc/init.d/network restart'
cmd_safety_sleep = 2
wan_led = '/sys/devices/platform/soc/soc:internal-regs/f1011000.i2c/i2c-0/i2c-1/1-002b/leds/omnia-led:wan'
def write_once(path, value):
with open(path, 'w') as f:
f.write(value)
def d(message):
if debug:
print message
def gpio_dir(pin):
return '/sys/class/gpio/gpio%d/' % pin
def init_gpio(pin):
if not (os.path.exists(gpio_dir(pin)) and
os.path.isdir(gpio_dir(pin))):
write_once(gpio_export, str(pin))
class GPIO:
gpio_export = '/sys/class/gpio/export'
if not (os.path.exists(gpio_dir(pin)) and
os.path.isdir(gpio_dir(pin))):
raise Exception('Can not access %s' % gpio_dir(pin))
def _sysfs_dir(self):
return '/sys/class/gpio/gpio%d/' % self.pin
def init():
init_gpio(sfpdet_pin)
write_once(os.path.join(gpio_dir(sfpdet_pin), 'direction'), 'in')
write_once(os.path.join(gpio_dir(sfpdet_pin), 'edge'), 'both')
init_gpio(sfpdis_pin)
write_once(os.path.join(gpio_dir(sfpdis_pin), 'direction'), 'out')
write_once(os.path.join(gpio_dir(sfpdis_pin), 'value'), '0')
def __init__(self, pin, direction, edge=None, value=None):
self.pin = pin
d = self._sysfs_dir()
if not (os.path.exists(d) and os.path.isdir(d)):
with open(GPIO.gpio_export, 'w') as f:
f.write(str(pin))
if not (os.path.exists(d) and os.path.isdir(d)):
raise Exception('Can not access %s' % d)
with open(os.path.join(d, 'direction'), 'w') as f:
f.write(direction)
if direction == 'in':
self.fd = open(os.path.join(d, 'value'), 'r')
elif direction == 'out':
self.fd = open(os.path.join(d, 'value'), 'w')
else:
raise Exception('Unknown direction %s' % direction)
if edge:
with open(os.path.join(d, 'edge'), 'w') as f:
f.write(edge)
if value:
self.fd.write(value)
def read(self):
self.fd.seek(0)
return int(self.fd.read().strip())
def write(self, val):
self.fd.write(val)
def getfd(self):
return self.fd.fileno()
class LED:
def __init__(self, sysfsdir):
self.sysfsdir = sysfsdir
def set_autonomous(self, aut):
with open(os.path.join(self.sysfsdir, 'autonomous'), 'w') as f:
f.write('1' if aut else '0')
def set_brightness(self, bright):
with open(os.path.join(self.sysfsdir, 'brightness'), 'w') as f:
f.write('1' if bright else '0')
init_gpio(sfplos_pin)
write_once(os.path.join(gpio_dir(sfplos_pin), 'direction'), 'in')
write_once(os.path.join(gpio_dir(sfplos_pin), 'edge'), 'both')
init_gpio(sfpflt_pin)
write_once(os.path.join(gpio_dir(sfpflt_pin), 'direction'), 'in')
write_once(os.path.join(gpio_dir(sfpflt_pin), 'edge'), 'both')
def set_led_mode(state):
if state == 1: # phy-def, autonomous blink
write_once(os.path.join(wan_led, 'autonomous'), '1')
elif state == 0: # phy-sfp, user blink
write_once(os.path.join(wan_led, 'autonomous'), '0')
else:
raise Exception("Unknown state %d. Can not happen." % state)
def set_led_brightness(light=False):
write_once(os.path.join(wan_led, 'brightness'), '1' if light else '0')
def set_nic_mode(mode, restart_net=True):
d('Switching mode to %s' % modemap[mode])
with open(sfp_select, 'r') as f:
c = f.read()
if c == modemap[mode]:
d("Current mode is %s . Noop." % c)
return
with open(sfp_select, 'w') as f:
f.write(modemap[mode])
d("Switch success.")
def do_switch(state, restart_net=True):
print 'Switching state to %s' % map[state]
write_once(sfp_select, map[state])
set_led_mode(state)
if restart_net:
time.sleep(cmd_safety_sleep)
d("Restarting net with command %s" % cmd_net_res)
os.system(cmd_net_res)
def oneshot():
init()
f = open(os.path.join(gpio_dir(sfpdet_pin), 'value'), 'r')
state_last = int(f.read().strip())
do_switch(state_last, False)
def led_change(led, sfplos, sfpflt):
led.set_brightness(False if sfplos.read() or sfpflt.read() else True)
def run():
global state_last
def led_init(mode, led, sfplos, sfpflt):
if mode == 1: # phy-def, autonomous blink
led.set_brightness(False)
led.set_autonomous(mode)
if mode == 0: # phy-sfp, user blink
led_change(led, sfplos, sfpflt)
def fdet_changed():
global state_last
fdet.seek(0)
state = int(fdet.read().strip())
if state != state_last:
state_last = state
do_switch(state)
def set_led():
global state_last
flos.seek(0)
fflt.seek(0)
los = int(flos.read().strip())
flt = int(fflt.read().strip())
def mode_change(sfpdet, led, sfplos, sfpflt, restart_net=True):
m = sfpdet.read() # 0: phy-sfp, user blink; 1: phy-def, autonomous blink
set_nic_mode(m, restart_net)
led_init(m, led, sfplos, sfpflt)
set_led_mode(state_last)
if los or flt:
set_led_brightness(False)
else:
set_led_brightness(True)
# Frontend functions
def reset_led():
sfpdet = GPIO(sfpdet_pin, 'in', edge='both')
sfplos = GPIO(sfplos_pin, 'in', edge='both')
sfpflt = GPIO(sfpflt_pin, 'in', edge='both')
led = LED(wan_led)
m = sfpdet.read() # 0: phy-sfp, user blink; 1: phy-def, autonomous blink
led.set_autonomous(m)
led_change(led, sfplos, sfpflt)
def oneshot():
sfpdet = GPIO(sfpdet_pin, 'in', edge='both')
sfpdis = GPIO(sfpdis_pin, 'out', value='0')
sfplos = GPIO(sfplos_pin, 'in', edge='both')
sfpflt = GPIO(sfpflt_pin, 'in', edge='both')
led = LED(wan_led)
mode_change(sfpdet, led, sfplos, sfpflt, False)
def run():
sfpdet = GPIO(sfpdet_pin, 'in', edge='both')
sfpdis = GPIO(sfpdis_pin, 'out', value='0')
sfplos = GPIO(sfplos_pin, 'in', edge='both')
sfpflt = GPIO(sfpflt_pin, 'in', edge='both')
led = LED(wan_led)
def fdet_changed():
d("sfpdet change detected: %d" % sfpdet.read())
mode_change(sfpdet, led, sfplos, sfpflt)
def flos_changed():
set_led()
d("sfplos change detected: %d " % sfplos.read())
led_change(led, sfplos, sfpflt)
def fflt_changed():
set_led()
d("sfpflt change detected: %d" % sfpflt.read())
led_change(led, sfplos, sfpflt)
init()
fdet = open(os.path.join(gpio_dir(sfpdet_pin), 'value'), 'r')
flos = open(os.path.join(gpio_dir(sfplos_pin), 'value'), 'r')
fflt = open(os.path.join(gpio_dir(sfpflt_pin), 'value'), 'r')
mode_change(sfpdet, led, sfplos, sfpflt, True)
po = select.epoll()
po.register(fdet, select.EPOLLPRI)
po.register(flos, select.EPOLLPRI)
po.register(fflt, select.EPOLLPRI)
state_last = int(fdet.read().strip())
do_switch(state_last)
set_led()
po.register(sfpdet.getfd(), select.EPOLLPRI)
po.register(sfplos.getfd(), select.EPOLLPRI)
po.register(sfpflt.getfd(), select.EPOLLPRI)
# main loop
while 1:
events = po.poll(60000)
for e in events:
ef = e[0] # event file descriptor
time.sleep(cmd_safety_sleep)
if ef == fdet.fileno():
if ef == sfpdet.getfd():
fdet_changed()
elif ef == flos.fileno():
elif ef == sfplos.getfd():
flos_changed()
elif ef == fflt.fileno():
elif ef == sfpflt.getfd():
fflt_changed()
else:
raise Exception("Unknown FD. Can not happen.")
......@@ -161,6 +214,7 @@ def help():
--oneshot : set the PHY and restart network, then exit
--nodaemon : run in foreground
--resetled : reset the LED according to current mode, then exit
NO PARAM : daemonize and wait for PHY change
"""
......@@ -168,6 +222,8 @@ def main():
if len(sys.argv) > 1:
if sys.argv[1] == '--oneshot':
oneshot()
elif sys.argv[1] == '--resetled':
reset_led()
elif sys.argv[1] == '--nodaemon':
run()
elif sys.argv[1] == '--help':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment