Skip to content
Snippets Groups Projects
Commit 0f5f7b19 authored by Tomas Hlavacek's avatar Tomas Hlavacek
Browse files

Rewrite sfpswitch for Omnia

Add support for user-forced mode in sfpswitch daemon.
Interface has to be restarted ony in case when the interface
is already up and when the mode has really changed.
Rewrite the command execution to subprocess calls.
Rewrite sfpswitch internal logic.
Make sfpswitch read EEPROM and detect particular modules.
Make sfpswitch use locking.
parent 13080d5b
No related branches found
No related tags found
No related merge requests found
......@@ -3,30 +3,38 @@
# sfpswitch daemon for Turris Omnia
# Copyright (c) 2016 CZ.NIC, z.s.p.o.
force_mode = None
# possible force_mode values are:
# None : autodetection
# 'phy-def' - metallic PHY
# 'phy-sfp' - 1000BASE-X
# 'phy-sfp-noneg' - 1000BASE-X, no up/down in-band signalling (force up)
# 'phy-sfp-sgmii' - SGMII
lockfile = '/var/run/sfpswitch.lock'
debug = False
daemon = False
import sys
import os
import select
import time
import fcntl
import syslog
import getopt
import subprocess
debug = 1
sfpdet_pin = 508
sfpdis_pin = 505
sfplos_pin = 507
sfpflt_pin = 504
sfp_select = '/sys/devices/platform/soc/soc:internal-regs/f1034000.ethernet/net/eth1/phy_select'
cmd_net_res = 'ip link set down dev eth1; /etc/init.d/network restart'
cmd_safety_sleep = 2
wan_led = '/sys/devices/platform/soc/soc:internal-regs/f1011000.i2c/i2c-0/i2c-1/1-002b/leds/omnia-led:wan'
cmd_i2c_read = ['/usr/sbin/i2cget', '-y', '5', '0x50', '0x6', 'b'] # bus i2c-5, chipaddr 0x50, byte 0x6, byte mode
def l(message):
if daemon:
syslog.syslog(message)
else:
sys.stderr.write(message + "\n")
def d(message):
if debug:
print message
l(message)
class GPIO:
......@@ -71,7 +79,7 @@ class GPIO:
def write(self, val):
self.fd.write(val)
self.fd.write(str(val))
def getfd(self):
......@@ -82,138 +90,271 @@ class LED:
def __init__(self, sysfsdir):
self.sysfsdir = sysfsdir
def _get_file(self, filename):
return os.path.join(self.sysfsdir, filename)
def set_autonomous(self, aut):
with open(os.path.join(self.sysfsdir, 'autonomous'), 'w') as f:
with open(self._get_file('autonomous'), 'w') as f:
f.write('1' if aut else '0')
def set_brightness(self, bright):
with open(os.path.join(self.sysfsdir, 'brightness'), 'w') as f:
f.write('1' if bright else '0')
def set_brightness(self, light):
with open(self._get_file('brightness'), 'w') as f:
f.write('255' if light else '0')
class EEPROM:
IOCTL_I2C_SLAVE = 0x0703
def __init__(self, busnum, address=0x50, size=256, pagesize=8):
self.busnum = busnum
self.address = address
self.size = size
self.pagesize = pagesize
def decide_sfpmode(sfpdet_val):
if sfpdet_val == 1: # phy-def, autonomous blink
return 'phy-def'
elif sfpdet_val == 0: # phy-sfp or phy-sgmii-sfp, user blink
p = subprocess.Popen(cmd_i2c_read, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
d("I2C read: %s" % (out + err))
@staticmethod
def _get_dev_filename(busnum):
return "/dev/i2c-%d" % busnum
def open_i2c(self):
self.f = open(self._get_dev_filename(self.busnum), "w+b", 0)
fcntl.ioctl(self.f, self.IOCTL_I2C_SLAVE, self.address)
def read_byte(self, offset):
self.f.write(chr(offset))
return self.f.read(1)
def read_page(self, offset, size):
self.f.write(chr(offset))
return self.f.read(size)
def read_eeprom(self):
self.open_i2c()
for addr in range(0, self.size-1):
offset = addr % self.pagesize
if offset == 0:
b = self.read_page(addr, self.pagesize)
if addr == self.size-1:
self.f.close()
yield b[offset]
class SFP:
def __init__(self, i2cbus):
self.i2cbus = i2cbus
@staticmethod
def detect_metanoia_xdsl(eeprom):
return ['X', 'C', 'V', 'R', '-', '0', '8', '0', 'Y', '5',
'5',] == eeprom[40:51]
@staticmethod
def detect_zisa_gpon(eeprom):
return ['T', 'W', '2', '3', '6', '2', 'H'] == eeprom[40:47]
@staticmethod
def detect_sgmii(eeprom):
if ord(eeprom[6]) & 0x08:
d("Mode selected: generic SGMII")
return True
else:
d("Mode selected: generic 1000BASE-X")
return False
def decide_sfpmode(self):
ec = []
try:
sfpt = int(out, 0)
if sfpt & 0x08:
return 'phy-sgmii-sfp'
else:
return 'phy-sfp'
except:
ec = list(EEPROM(self.i2cbus).read_eeprom())
d("SFP EEPROM: %s" % str(ec))
except Exception as e:
l("EEPROM read error: " + str(e))
return 'phy-sfp'
else:
raise Exception("Can not happen.")
# special case: Metanoia xDSL SFP, 1000BASE-X, no link autonegotiation
if self.detect_metanoia_xdsl(ec):
l("Metanoia DSL SFP detected. Switching to phy-sfp-noneg mode.")
return 'phy-sfp-noneg'
def set_nic_mode(sfpdet_val, restart_net=True):
mode = decide_sfpmode(sfpdet_val)
d('Switching mode to %s.' % mode)
# special case: Zisa GPON SFP, SGMII
if self.detect_zisa_gpon(ec):
l("Zisa GPON SFP detected. Switching to phy-sfp-sgmii mode.")
return 'phy-sfp-sgmii'
with open(sfp_select, 'r') as f:
c = f.read()
if c == mode:
d("Current mode is already %s. Noop." % c)
return
with open(sfp_select, 'w') as f:
f.write(mode)
# SGMII detection
if self.detect_sgmii(ec):
return 'phy-sfp-sgmii'
d("Switch success.")
# default 1000BASE-X
return 'phy-sfp'
if restart_net:
time.sleep(cmd_safety_sleep)
d("Restarting net with command %s" % cmd_net_res)
os.system(cmd_net_res)
def led_change(led, sfplos, sfpflt):
led.set_brightness(False if sfplos.read() or sfpflt.read() else True)
class Omnia:
sfpdet_pin = 508
sfpdis_pin = 505
sfplos_pin = 507
sfpflt_pin = 504
sfp_select = '/sys/devices/platform/soc/soc:internal-regs/f1034000.ethernet/net/eth1/phy_select'
bin_ip = '/sbin/ip'
sfp_iface = 'eth1'
cmd_init_time = 1
wan_led = '/sys/devices/platform/soc/soc:internal-regs/f1011000.i2c/i2c-0/i2c-1/1-002b/leds/omnia-led:wan'
def led_init(sfpdet_val, led, sfplos, sfpflt):
if sfpdet_val == 1: # phy-def, autonomous blink
led.set_brightness(False)
led.set_autonomous(sfpdet_val)
if sfpdet_val == 0: # phy-sfp or phy-sgmii-sfp, user blink
led_change(led, sfplos, sfpflt)
sfp_i2c_bus = 5
sfp_i2c_eeprom_init_time = 1
def __init__(self):
self.sfpdet = GPIO(self.sfpdet_pin, 'in', edge='both')
self.sfplos = GPIO(self.sfplos_pin, 'in', edge='both')
self.sfpflt = GPIO(self.sfpflt_pin, 'in', edge='both')
self.sfpdis = GPIO(self.sfpdis_pin, 'out', edge=None, value=0)
self.led = LED(self.wan_led)
def mode_change(sfpdet, led, sfplos, sfpflt, restart_net=True):
sd = sfpdet.read() # 0: phy-sfp, user blink; 1: phy-def, autonomous blink
set_nic_mode(sd, restart_net)
led_init(sd, led, sfplos, sfpflt)
def set_nic_mode(self, mode):
l('Switching NIC mode to %s.' % mode)
with open(self.sfp_select, 'r') as f:
c = f.read()
if c == mode:
d("Current mode is already %s. Noop." % c)
return False
with open(self.sfp_select, 'w') as f:
f.write(mode)
d("Switched successfully to mode %s." % mode)
return True
# Frontend functions
def restart_net(self):
d("Testing whether the interface %s is up..." % self.sfp_iface)
p = subprocess.Popen([self.bin_ip, 'link', 'show', 'dev', self.sfp_iface],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if ',UP,' in out:
d("Interface %s is up. Sleeping for %d second(s)." % (self.sfp_iface,
self.cmd_init_time))
time.sleep(self.cmd_init_time)
l("Shutting down interface %s" % self.sfp_iface)
subprocess.call([self.bin_ip, 'link', 'set', 'down', 'dev', self.sfp_iface])
l("Bringing up interface %s" % self.sfp_iface)
subprocess.call([self.bin_ip, 'link', 'set', 'up', 'dev', self.sfp_iface])
else:
l("Interface is down. Noop." % self.sfp_iface)
def reset_led():
sfpdet = GPIO(sfpdet_pin, 'in', edge='both')
sfplos = GPIO(sfplos_pin, 'in', edge='both')
sfpflt = GPIO(sfpflt_pin, 'in', edge='both')
led = LED(wan_led)
d("Net restart finished.")
def led_light_handler(self):
if self.sfpdet.read() == 1: # which is weird
d("Warning: SFP status signal changing in PHY mode.")
self.led.set_brightness(False)
return
self.led.set_brightness(False if self.sfplos.read() or
self.sfpflt.read() else True)
def led_mode_handler(self):
sfpdet_val = self.sfpdet.read()
if sfpdet_val == 1: # phy-def, autonomous blink
self.led.set_brightness(False)
self.led.set_autonomous(sfpdet_val)
if sfpdet_val == 0: # phy-sfp or phy-sfp-*, user blink
self.led_light_handler()
def decide_nic_mode(self):
global force_mode
if force_mode:
return force_mode
sfpdet_val = self.sfpdet.read()
if sfpdet_val == 1: # phy-def, autonomous blink
d("Removed SFP, using onboad PHY.")
return 'phy-def'
elif sfpdet_val == 0: # phy-sfp or phy-sfp-*, user blink
d("SFP inserted, setting sfpdis=0")
self.sfpdis.write(0)
d("Going to probe EEPROM after init in %d s." %
self.sfp_i2c_eeprom_init_time)
time.sleep(self.sfp_i2c_eeprom_init_time)
return SFP(self.sfp_i2c_bus).decide_sfpmode()
def nic_mode_handler(self):
# set the NIC mode in /sys
net_res_flag = self.set_nic_mode(self.decide_nic_mode())
# set proper LED mode and turn on/off the LED
self.led_mode_handler()
# restart interface in Linux for changes in /sys to be applied
if net_res_flag:
o.restart_net()
else:
d("Mode not changed. Iface restart not needed.")
m = sfpdet.read() # 0: phy-sfp, user blink; 1: phy-def, autonomous blink
led.set_autonomous(m)
led_change(led, sfplos, sfpflt)
def oneshot():
sfpdet = GPIO(sfpdet_pin, 'in', edge='both')
sfpdis = GPIO(sfpdis_pin, 'out', value='0')
sfplos = GPIO(sfplos_pin, 'in', edge='both')
sfpflt = GPIO(sfpflt_pin, 'in', edge='both')
led = LED(wan_led)
mode_change(sfpdet, led, sfplos, sfpflt, False)
# Frontend functions
def reset_led():
Omnia().led_mode_handler()
def oneshot():
Omnia().nic_mode_handler()
def run():
sfpdet = GPIO(sfpdet_pin, 'in', edge='both')
sfpdis = GPIO(sfpdis_pin, 'out', value='0')
sfplos = GPIO(sfplos_pin, 'in', edge='both')
sfpflt = GPIO(sfpflt_pin, 'in', edge='both')
led = LED(wan_led)
try:
lf = open(lockfile, 'w+')
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
lf.write(str(os.getpid())+"\n")
lf.flush()
except IOError as e:
l('Can not obtain lock file %s. Exit.' % lockfile)
sys.exit(1)
o = Omnia()
def fdet_changed():
d("sfpdet change detected: %d" % sfpdet.read())
mode_change(sfpdet, led, sfplos, sfpflt)
d("sfp det change detected: %d" % o.sfpdet.read())
o.nic_mode_handler()
def flos_changed():
d("sfplos change detected: %d " % sfplos.read())
led_change(led, sfplos, sfpflt)
d("sfp los change detected: %d " % o.sfplos.read())
o.led_light_handler()
def fflt_changed():
d("sfpflt change detected: %d" % sfpflt.read())
led_change(led, sfplos, sfpflt)
d("sfp flt change detected: %d" % o.sfpflt.read())
o.led_light_handler()
mode_change(sfpdet, led, sfplos, sfpflt, True)
# init
fdet_changed()
po = select.epoll()
po.register(sfpdet.getfd(), select.EPOLLPRI)
po.register(sfplos.getfd(), select.EPOLLPRI)
po.register(sfpflt.getfd(), select.EPOLLPRI)
po.register(o.sfpdet.getfd(), select.EPOLLPRI)
po.register(o.sfplos.getfd(), select.EPOLLPRI)
po.register(o.sfpflt.getfd(), select.EPOLLPRI)
# main loop
while 1:
while True:
events = po.poll(60000)
for e in events:
ef = e[0] # event file descriptor
if ef == sfpdet.getfd():
if ef == o.sfpdet.getfd():
fdet_changed()
elif ef == sfplos.getfd():
elif ef == o.sfplos.getfd():
flos_changed()
elif ef == sfpflt.getfd():
elif ef == o.sfpflt.getfd():
fflt_changed()
else:
raise Exception("Unknown FD. Can not happen.")
......@@ -223,40 +364,69 @@ def create_daemon():
try:
pid = os.fork()
if pid > 0:
print 'PID: %d' % pid
os._exit(0)
sys.exit(0)
except OSError, error:
print 'Unable to fork. Error: %d (%s)' % (error.errno, error.strerror)
os._exit(1)
except OSError as e:
l('Unable to fork. Error: %s' % str(e))
sys.exit(1)
run()
def help():
print """sfpswitch.py daemon for Turris Omnia
--oneshot : set the PHY and restart network, then exit
--nodaemon : run in foreground
--resetled : reset the LED according to current mode, then exit
NO PARAM : daemonize and wait for PHY change
-o --oneshot : set the PHY and restart network, then exit
-n --nodaemon : run in foreground
-r --resetled : reset the LED according to current mode, then exit
-d --debug : turn on debug output
NO PARAM : daemonize and wait in loop for PHY change
"""
def main():
if len(sys.argv) > 1:
if sys.argv[1] == '--oneshot':
oneshot()
elif sys.argv[1] == '--resetled':
reset_led()
elif sys.argv[1] == '--nodaemon':
run()
elif sys.argv[1] == '--help':
global debug, daemon
daemon = False
oneshot_flag = False
resetled_flag = False
nodaemon_flag = False
optlist = args = []
try:
optlist, args = getopt.getopt(sys.argv[1:], "ornhd",
['oneshot', 'resetled', 'nodaemon', 'help', 'debug'])
except getopt.GetoptError as err:
print str(err)+"\n"
help()
sys.exit(1)
for o, a in optlist:
if o == '--oneshot' or o == '-o':
oneshot_flag = True
elif o == '--resetled' or o == '-r':
resetled_flag = True
elif o == '--nodaemon' or o == '-n':
nodaemon_flag = True
elif o == '--help' or o == '-h':
help()
sys.exit(0)
elif o == '--debug' or o == '-d':
debug = True
else:
print "Unknown option: %s" % sys.argv[1]
print "Unknown option: %s" % o
help()
sys.exit(1)
if oneshot_flag:
oneshot()
elif resetled_flag:
reset_led()
elif nodaemon_flag:
run()
else:
daemon = True
create_daemon()
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment