Open Ethernet Networking (OpEN) API Guide and Reference Manual  3.6.0.3
vlan_example.py
1 /*! \file vlan_example.py
2  */
3 
4 import OpEN_py as OpEN
5 from OpENUtil import *
6 from time import time
7 from select import select
8 import fcntl
9 import os
10 import errno
11 
12 
13 #
14 # Copyright 2016 Broadcom.
15 #
16 # Licensed under the Apache License, Version 2.0 (the "License");
17 # you may not use this file except in compliance with the License.
18 # You may obtain a copy of the License at
19 #
20 # http://www.apache.org/licenses/LICENSE-2.0
21 #
22 # Unless required by applicable law or agreed to in writing, software
23 # distributed under the License is distributed on an "AS IS" BASIS,
24 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 # See the License for the specific language governing permissions and
26 # limitations under the License.
27 #
28 
29 
30 #
31 # Python 2.6.6
32 #
33 
34 class Vlan :
35  def __init__(self):
36  self.m_vlan_exists = False
37  self.m_staticVlan = False
38 
39 class VlanExample :
40  def __init__(self, client) :
41  self.m_client = client
42  self.m_vlan_table = []
43 
44  def vlan_status_show(self):
45  print "\n\nThe following VLAN IDs are in the switch's VLAN database:\n"
46  size = 4096
47  for i in xrange(0, size - 1):
48  if self.m_vlan_table[i].vlan_exists == True :
49  res = "VLAN: " + str(i) + " is "
50  if self.m_vlan_table[i].staticVlan == True :
51  res += "Static"
52  else :
53  res += "Dynamic"
54  print res
55 
56  def vlan_table_validate(self):
57  size = 4096
58  for vlan_num in xrange(0, size - 1) :
59  if self.m_vlan_table[vlan_num].exists == True :
60  ret = OpEN.openapiVlanCreatedCheck(self.m_client, vlan_num)
61  if ret == OPEN_E_NONE :
62  self.m_vlan_table[vlan_num].staticVlan = OpEN.openapiVlanIsStatic(self.m_client, vlan_num)
63  else :
64  self.m_vlan_table[vlan_num].vlan_exists = False;
65  self.m_vlan_table[vlan_num].staticVlan = False;
66 
67  def vlan_table_create(self):
68  size = 4096
69  self.m_vlan_table = []
70 
71  open_event = OpEN.openapiEventList_t()
72  OpEN.openapiEventListClear(open_event)
73  OpEN.openapiEventSet(open_event, OpEN.OPEN_EVENT_VLAN)
74 
75  ret = OpEN.openapiEventRegister(self.m_client, open_event)
76  if ret != OpEN.OPEN_E_NONE :
77  print "Failed to register for OPEN_EVENT_VLAN events.\n"
78  else :
79  print "Successfully registered for OPEN_EVENT_VLAN events.\n"
80 
81  size -= 1
82  for vlan_num in xrange(0, size):
83  vlan = Vlan()
84  ret = OpEN.openapiVlanCreatedCheck(self.m_client, vlan_num)
85  if ret == OpEN.OPEN_E_NONE :
86  vlan.vlan_exists = True
87  vlan.staticVlan = OpEN.openapiVlanIsStatic(self.m_client, vlan_num)
88  else :
89  vlan.vlan_exists = False
90  vlan.staticVlan = False
91  self.m_vlan_table.append(vlan)
92 
93  vlan_num = 1
94  while True :
95  self.m_vlan_table[vlan_num].vlan_exists = True
96  self.m_vlan_table[vlan_num].staticVlan = OpEN.openapiVlanIsStatic(self.m_client, vlan_num)
97  vlan_num_p = OpEN.new_uint32_tp()
98  ret = OpEN.openapiVlanNextGet(self.m_client, vlan_num, vlan_num_p)
99  if ret == OpEN.OPEN_E_NONE:
100  vlan_num = OpEN.uint32_tp_value(vlan_num_p)
101  else:
102  break
103  OpEN.delete_uint32_tp(vlan_num_p)
104 
105  def vlan_table_monitor(self, duration):
106  change_pending_event = OpEN.openapiEventList_t()
107  OpEN.openapiEventListClear(change_pending_event)
108  purge_event = OpEN.openapiEventList_t()
109  OpEN.openapiEventListClear(purge_event)
110 
111  end_time = time() + duration
112  notify_fd = OpEN.openapiClientNotifySocketFDGet(self.m_client)
113  if notify_fd == 0 :
114  print "Invalid notify_fd retrieved"
115  else :
116  fcntl.fcntl(notify_fd, fcntl.F_SETFL, os.O_NONBLOCK)
117  print "notify_fd is %d\n" % notify_fd
118  rd_fds = [notify_fd]
119  wr_fds = []
120  ex_fds = []
121  while time() < end_time :
122  rr, wr, er = select(rd_fds, wr_fds, ex_fds, 5)
123  if len(rr) :
124  while True:
125  s = ""
126  try:
127  s = os.read(notify_fd, 1024)
128  except os.error, e:
129  if e.errno == errno.EWOULDBLOCK:
130  break
131  if len(s) > 0 :
132  continue
133  else :
134  break
135  ret = OpEN.openapiPendingEventsGet(self.m_client, change_pending_event.m_open_event, purge_event.m_open_event)
136  if ret == OpEN.OPEN_E_NONE :
137  if purge_event.is_set(OpEN.OPEN_EVENT_VLAN) == True :
138  print "Got Purge event for OPEN_EVENT_VLAN"
139  vlan_table_validate()
140  if change_pending_event.is_set(OpEN.OPEN_EVENT_VLAN) == True :
141  print "Got Change Pending event for OPEN_EVENT_VLAN"
142 
143  vlan_num = 0
144  vlan_num_p = OpEN.new_uint32_tp()
145  delete_pending_p = OpEN.new_uint32_tp()
146  while True :
147  ret = OpEN.openapiVlanNextChangedGet(self.m_client, vlan_num, vlan_num_p, delete_pending_p)
148  if ret == OpEN.OPEN_E_NONE :
149  vlan_num = OpEN.uint32_tp_value(vlan_num_p)
150  delete_pending = OpEN.uint32_tp_value(delete_pending_p)
151  if delete_pending != 0 :
152  print "Deleting VLAN ID: %d" % vlan_num
153  self.m_vlan_table[vlan_num].vlan_exists = False
154  self.m_vlan_table[vlan_num].staticVlan = False
155  else :
156  print "Change event received for VLAN ID: %d" % vlan_num
157  ret2 = OpEN.openapiVlanCreatedCheck(self.m_client, vlan_num)
158  if ret2 == OpEN.OPEN_E_NONE :
159  self.m_vlan_table[vlan_num].vlan_exists = True
160  self.m_vlan_table[vlan_num].staticVlan = OpEN.openapiVlanIsStatic(self.m_client, vlan_num)
161  else :
162  self.m_vlan_table[vlan_num].vlan_exists = False
163  self.m_vlan_table[vlan_num].staticVlan = False
164  self.vlan_status_show()
165  else :
166  break
167  OpEN.delete_uint32_tp(vlan_num_p)
168  OpEN.delete_uint32_tp(delete_pending_p)
169 
170 open = OpENUtil()
171 ret = open.connect("vlan_example")
172 if ret == OpEN.OPEN_E_NONE :
173  open.getNetworkOSVersion()
174  open.getAPIVersion()
175  client = open.get_client()
176  vlanExample = VlanExample(client)
177  vlanExample.vlan_table_create()
178  vlanExample.vlan_status_show()
179  vlanExample.vlan_table_monitor(60)
180  vlanExample.vlan_status_show()
181  open.terminate()
182 else :
183  print "Unable to connect"
184