1 /*! \file bgp_example.py
6 """bgp_example.py: OpEN API Border Gateway Protocol (BGP) configuration example"""
39 """Convert ipv4 string to integer"""
41 return struct.unpack(
"!I", socket.inet_aton(addr))[0]
44 """Convert ipv4 integer to string"""
46 return socket.inet_ntoa(struct.pack(
"!I", addr))
48 def ipv6_to_int(addr):
49 """Convert ipv6 string to integer"""
51 str_ = socket.inet_pton(socket.AF_INET6, addr)
52 a, b = struct.unpack(
'!2Q', str_)
55 def int_to_ipv6(addr):
56 """Convert ipv6 integer to string"""
59 b = addr & ((1 << 64) - 1)
60 return socket.inet_ntop(socket.AF_INET6, struct.pack(
'!2Q', a, b))
62 def ip_to_open_inet(address):
63 """ Convert ip address to integer and family and return as open_inet_addr_t """
66 addr = OpEN.open_inet_addr_t()
67 addr.family = OpEN.OPEN_AF_NONE
69 addr.addr.ipv4 = ip_to_int(address)
70 addr.family = OpEN.OPEN_AF_INET
71 result = OpEN.OPEN_E_NONE
76 addr.addr.ipv6 = ipv6_to_int(address)
77 addr.family = OpEN.OPEN_AF_INET6
78 result = OpEN.OPEN_E_NONE
81 result = OpEN.OPEN_E_PARAM
84 result = OpEN.OPEN_E_UNAVAIL
86 return (result, addr, type)
88 def print_sanity_results(result, test, msg, feat):
89 """Print overall comparison results"""
91 if result == OpEN.OPEN_E_UNAVAIL:
92 print "Sanity test skipped - %s - %s." % (msg, feat)
93 elif result == OpEN.OPEN_E_NONE
and test ==
True:
94 print "Sanity Success - %s - %s." % (msg, feat)
96 print "Sanity Failure - %s - %s." % (msg, feat)
98 def print_bad_result(result, msg):
99 """Print some general error messages if the result is bad"""
101 if result == OpEN.OPEN_E_UNAVAIL:
102 print "Feature not supported - %s (err %d)." % (msg, result)
103 elif result != OpEN.OPEN_E_NONE:
104 print "Test Failure - %s (err %d)." % (msg, result)
106 def get_first_routing_protocol_name(client_handle):
107 """ Get the first available routing protocol name. """
109 max_proto_len_p = OpEN.new_uint32_tp()
111 result = OpEN.openapiRouteProtoNameLenMax(client_handle, max_proto_len_p)
112 print_bad_result(result,
'openapiRouteProtoNameLenMax')
113 if result == OpEN.OPEN_E_NONE:
114 max_proto_len = OpEN.uint32_tp_value(max_proto_len_p) + 1
116 proto_buff_string = open_.getStringBuffer(max_proto_len)
117 except OpENBufferSizeError:
118 print(
"get_first_routing_protocol_name: getStringBuffer raised OpENBufferSizeError")
121 print(
"get_first_routing_protocol_name: getStringBuffer raised TypeError")
124 proto_buff = OpEN.open_buffdesc()
125 proto_buff.pstart = proto_buff_string
126 proto_buff.size = max_proto_len
129 next_proto_id_p = OpEN.new_uint32_tp()
131 for idx
in xrange(0, XX_PROTO_MAX) :
132 result = OpEN.openapiIpRouterProtoNameNextGet(client_handle, proto_id, proto_buff, next_proto_id_p)
134 if result == OpEN.OPEN_E_NONE
and (len(proto_buff_string.cast())>0):
135 return proto_buff_string.cast()
136 proto_id = OpEN.uint32_tp_value(next_proto_id_p)
137 OpEN.delete_uint32_tp(next_proto_id_p)
139 OpEN.delete_uint32_tp(max_proto_len_p)
143 def get_first_routing_interface(client_handle):
144 """ Get the first available routing interface. """
147 intf_p = OpEN.new_uint32_tp()
149 result = OpEN.openapiRtrIntfNextGet(client_handle, intf, intf_p)
150 intf = OpEN.uint32_tp_value(intf_p)
152 OpEN.delete_uint32_tp(intf_p)
156 """Simple BGP class implementing basic CRUD examples """
158 def __init__(self, client) :
159 self.m_client = client
161 def test_local_as(self, asn) :
162 """ Sets the local BGP autonomous system number to the given value.
163 The value is then read and compared to verify the operation. """
165 result = OpEN.openapiBgpLocalASSet (self.m_client, asn)
166 print_bad_result(result,
'openapiBgpLocalASSet')
168 tmp_p = OpEN.new_uint32_tp()
170 if result == OpEN.OPEN_E_NONE:
172 result = OpEN.openapiBgpLocalASGet (self.m_client, tmp_p)
173 print_bad_result(result,
'openapiBgpLocalASGet')
175 print_sanity_results(result, asn==OpEN.uint32_tp_value(tmp_p),
'local asn', str(asn))
176 OpEN.delete_uint32_tp(tmp_p)
178 def test_local_id(self, address) :
179 """ Sets the BGP router id to some arbitrary address.
180 The value is then read and compared to verify the operation. """
182 addr_p = OpEN.new_uint32_tp()
183 addr = ip_to_int(address)
185 result = OpEN.openapiBgpLocalIdSet (self.m_client, addr)
186 print_bad_result(result,
'openapiBgpLocalIdSet')
188 if result == OpEN.OPEN_E_NONE:
190 result = OpEN.openapiBgpLocalIdGet (self.m_client, addr_p)
191 print_bad_result(result,
'openapiBgpLocalASGet')
193 print_sanity_results(result, addr == OpEN.uint32_tp_value(addr_p),
'local id', address)
194 OpEN.delete_uint32_tp(addr_p)
196 def test_local_pref(self, preference) :
197 """ Sets the BGP default local preference to the given value.
198 The value is then read and compared to verify the operation. """
200 result = OpEN.openapiBgpLocalPrefSet (self.m_client, preference)
201 print_bad_result(result,
'openapiBgpLocalPrefSet')
202 tmp_p = OpEN.new_uint32_tp()
204 if result == OpEN.OPEN_E_NONE:
206 result = OpEN.openapiBgpLocalPrefGet (self.m_client, tmp_p)
207 print_bad_result(result,
'openapiBgpLocalPrefGet')
209 print_sanity_results(result, preference == OpEN.uint32_tp_value(tmp_p),
'local pref', str(preference))
210 OpEN.delete_uint32_tp(tmp_p)
212 def test_distance(self, external_distance, internal_distance, local_distance) :
213 """ Sets the BGP distance for external, internal, and local to some arbitrary value.
214 The value is then read and compared to verify the operation. """
216 result = OpEN.openapiIpRouterPreferenceSet (self.m_client, OpEN.OPEN_PREF_EBGP, external_distance)
217 print_bad_result(result,
'openapiIpRouterPreferenceSet EBGP')
218 tmp_p = OpEN.new_uint32_tp()
220 if result == OpEN.OPEN_E_NONE:
222 result = OpEN.openapiIpRouterPreferenceGet (self.m_client, OpEN.OPEN_PREF_EBGP, tmp_p)
223 print_bad_result(result,
'openapiIpRouterPreferenceGet EBGP')
225 print_sanity_results(result, external_distance == OpEN.uint32_tp_value(tmp_p),
'external distance', str(external_distance))
226 OpEN.delete_uint32_tp(tmp_p)
228 result = OpEN.openapiIpRouterPreferenceSet (self.m_client, OpEN.OPEN_PREF_IBGP, internal_distance)
229 print_bad_result(result,
'openapiIpRouterPreferenceSet IBGP')
230 tmp_p = OpEN.new_uint32_tp()
232 if result == OpEN.OPEN_E_NONE:
234 result = OpEN.openapiIpRouterPreferenceGet (self.m_client, OpEN.OPEN_PREF_IBGP, tmp_p)
235 print_bad_result(result,
'openapiIpRouterPreferenceGet IBGP')
237 print_sanity_results(result, internal_distance == OpEN.uint32_tp_value(tmp_p),
'internal distance', str(internal_distance))
238 OpEN.delete_uint32_tp(tmp_p)
240 result = OpEN.openapiIpRouterPreferenceSet (self.m_client, OpEN.OPEN_PREF_LOCAL_BGP, local_distance)
241 print_bad_result(result,
'openapiIpRouterPreferenceSet LOCAL')
242 tmp_p = OpEN.new_uint32_tp()
244 if result == OpEN.OPEN_E_NONE:
246 result = OpEN.openapiIpRouterPreferenceGet (self.m_client, OpEN.OPEN_PREF_LOCAL_BGP, tmp_p)
247 print_bad_result(result,
'openapiIpRouterPreferenceGet LOCAL')
249 print_sanity_results(result, local_distance == OpEN.uint32_tp_value(tmp_p),
'local distance', str(local_distance))
250 OpEN.delete_uint32_tp(tmp_p)
252 def test_global_hold_time(self, seconds) :
253 """ Set the configured hold time for a given BGP peer.
254 The time is then read and compared to verify the operation. """
256 result = OpEN.openapiBgpGlobalHoldTimeConfiguredSet (self.m_client, seconds)
257 print_bad_result(result,
'openapiBgpGlobalHoldTimeConfiguredSet')
258 tmp_p = OpEN.new_uint32_tp()
260 if result == OpEN.OPEN_E_NONE:
262 result = OpEN.openapiBgpGlobalHoldTimeConfiguredGet (self.m_client, tmp_p)
263 print_bad_result(result,
'openapiBgpGlobalHoldTimeConfiguredGet')
265 print_sanity_results(result, seconds==OpEN.uint32_tp_value(tmp_p),
'global hold time', str(seconds))
266 OpEN.delete_uint32_tp(tmp_p)
268 def test_global_keep_alive(self, seconds) :
269 """ Set the configured keep alive time for a given BGP peer.
270 The time is then read and compared to verify the operation. """
272 result = OpEN.openapiBgpGlobalKeepAliveConfiguredSet (self.m_client, seconds)
273 print_bad_result(result,
'openapiBgpGlobalKeepAliveConfiguredSet')
274 tmp_p = OpEN.new_uint32_tp()
276 if result == OpEN.OPEN_E_NONE:
278 result = OpEN.openapiBgpGlobalKeepAliveConfiguredGet (self.m_client, tmp_p)
279 print_bad_result(result,
'openapiBgpGlobalKeepAliveConfiguredGet')
281 print_sanity_results(result, seconds==OpEN.uint32_tp_value(tmp_p),
'global keep alive time', str(seconds))
282 OpEN.delete_uint32_tp(tmp_p)
284 def test_log_neighbor(self, log_changes) :
285 """ Sets the configuration indicating BGP to log neighbor changes.
286 The value is then read and compared to verify the operation. """
288 result = OpEN.openapiBgpLogNeighborChangesSet (self.m_client, log_changes)
289 print_bad_result(result,
'openapiBgpLogNeighborChangesSet')
290 tmp_p = OpEN.new_bool_tp()
292 if result == OpEN.OPEN_E_NONE:
294 result = OpEN.openapiBgpLogNeighborChangesGet (self.m_client, tmp_p)
295 print_bad_result(result,
'openapiBgpLogNeighborChangesGet')
297 print_sanity_results(result, log_changes==OpEN.bool_tp_value(tmp_p),
'log neighbor', str(log_changes))
298 OpEN.delete_bool_tp(tmp_p)
300 def test_max_paths(self, af, is_ibgp) :
301 """ Set the maximum number of paths that BGP can report.
302 The value is then read and compared to verify the operation. """
304 tmp_p = OpEN.new_uint32_tp()
305 max_p = OpEN.new_uint32_tp()
308 result = OpEN.openapiBgpMaxPathsGet(self.m_client, af, is_ibgp, max_p)
309 print_bad_result(result,
'openapiBgpMaxPathsGet')
311 if result == OpEN.OPEN_E_NONE:
312 result = OpEN.openapiBgpMaxPathsSet(self.m_client, af, is_ibgp, OpEN.uint32_tp_value(max_p))
313 print_bad_result(result,
'openapiBgpMaxPathsSet')
315 if result == OpEN.OPEN_E_NONE:
317 result = OpEN.openapiBgpMaxPathsGet(self.m_client, af, is_ibgp, tmp_p)
318 print_bad_result(result,
'openapiBgpMaxPathsGet')
320 max_v = OpEN.uint32_tp_value(max_p)
322 bgp =
'Internal (%d)' % max_v
324 bgp =
'External (%d)' % max_v
326 print_sanity_results(result, max_v == OpEN.uint32_tp_value(tmp_p),
'max paths', bgp)
327 OpEN.delete_uint32_tp(tmp_p)
328 OpEN.delete_uint32_tp(max_p)
330 def test_network(self, normal_mode, use_route_map, address, prefix, route_map_name) :
331 """ Configures an arbitrary network to advertise via BGP. """
333 result, addr, type = ip_to_open_inet(address)
334 msg = type +
' peer network'
336 if addr.family != OpEN.OPEN_AF_NONE:
338 name_string = open_.getStringBuffer(len(route_map_name) + 1, route_map_name)
339 except OpENBufferSizeError:
340 print(
"test_network: getStringBuffer raised OpENBufferSizeError")
343 print(
"test_network: getStringBuffer raised TypeError")
345 name_buff = OpEN.open_buffdesc()
346 name_buff.pstart = name_string
347 name_buff.size = len(route_map_name) + 1
348 result = OpEN.openapiBgpNetworkAddDelete(self.m_client,
354 print_bad_result(result,
'openapiBgpNetworkAddDelete')
356 print_sanity_results(result,
True, msg, address)
358 def test_redistribution(self, add, match_set, match_bits, metric_set,
359 metric_value_set, metric_value, route_map_set, af, route_map_name) :
360 """ Configures an arbitrary BGP route redistribution. """
362 result = OpEN.OPEN_E_FAIL
364 proto_name = get_first_routing_protocol_name(self.m_client)
365 if len(proto_name)>0:
367 proto_string = open_.getStringBuffer(len(proto_name) + 1, proto_name)
368 except OpENBufferSizeError:
369 print(
"test_redistribution: getStringBuffer raised OpENBufferSizeError")
372 print(
"test_redistribution: getStringBuffer raised TypeError")
374 proto_buff = OpEN.open_buffdesc()
375 proto_buff.pstart = proto_string
376 proto_buff.size = len(proto_name) + 1
379 route_string = open_.getStringBuffer(len(route_map_name) + 1, route_map_name)
380 except OpENBufferSizeError:
381 print(
"test_redistribution: getStringBuffer raised OpENBufferSizeError")
384 print(
"test_redistribution: getStringBuffer raised TypeError")
387 route_buff = OpEN.open_buffdesc()
388 route_buff.pstart = route_string
389 route_buff.size = len(route_map_name) + 1
391 result = OpEN.openapiBgpRedistributionSet(self.m_client,
402 print_bad_result(result,
'openapiBgpRedistributionSet')
404 print_sanity_results(result,
True,
'redistribution', route_map_name)
406 def test_peer_remote_as(self, asn, address, scope_id) :
407 """ Create a peer with a given IP address and remote AS number.
408 The remote AS number is then read and compared to verify the operation."""
410 result, addr, type = ip_to_open_inet(address)
411 msg = type +
' peer remote asn %d scope %d' % (asn, scope_id)
413 if addr.family != OpEN.OPEN_AF_NONE:
414 result = OpEN.openapiBgpPeerRemoteASSet(self.m_client,
418 print_bad_result(result,
'openapiBgpPeerRemoteASSet')
420 tmp_p = OpEN.new_uint32_tp()
421 if result == OpEN.OPEN_E_NONE:
422 result = OpEN.openapiBgpPeerRemoteASGet(self.m_client,
426 print_bad_result(result,
'openapiBgpPeerRemoteASGet')
428 print_sanity_results(result, asn == OpEN.uint32_tp_value(tmp_p), msg, address)
429 OpEN.delete_uint32_tp(tmp_p)
431 def test_peer_admin_status(self, address, scope_id, status, def_value) :
432 """ Create a peer with a given IP address and remote AS number.
433 The remote AS number is then read and compared to verify the operation."""
435 result, addr, type = ip_to_open_inet(address)
436 msg = type +
' peer admin status'
438 if addr.family != OpEN.OPEN_AF_NONE:
439 result = OpEN.openapiBgpPeerAdminStatusSet(self.m_client,
444 print_bad_result(result,
'openapiBgpPeerAdminStatusSet')
446 tmp_p = OpEN.new_OPEN_BGP_PEER_STATE_tp()
447 if result == OpEN.OPEN_E_NONE:
448 result = OpEN.openapiBgpPeerAdminStatusGet(self.m_client,
449 OpEN.OPEN_BGP_GET_FINAL,
453 print_bad_result(result,
'openapiBgpPeerAdminStatusGet')
455 print_sanity_results(result, status == OpEN.OPEN_BGP_PEER_STATE_tp_value(tmp_p), msg, status)
456 OpEN.delete_OPEN_BGP_PEER_STATE_tp(tmp_p)
458 def test_peer_keep_alive(self, address, scope_id, time, def_value) :
459 """ Set the configured keep alive time for a given BGP peer.
460 The time is then read and compared to verify the operation."""
462 result, addr, type = ip_to_open_inet(address)
463 msg = type +
' peer keep alive'
465 if addr.family != OpEN.OPEN_AF_NONE:
466 result = OpEN.openapiBgpPeerKeepAliveConfiguredSet(self.m_client,
471 print_bad_result(result,
'openapiBgpPeerKeepAliveConfiguredSet')
473 tmp_p = OpEN.new_uint32_tp()
474 if result == OpEN.OPEN_E_NONE:
475 result = OpEN.openapiBgpPeerKeepAliveConfiguredGet(self.m_client,
476 OpEN.OPEN_BGP_GET_FINAL,
480 print_bad_result(result,
'openapiBgpPeerKeepAliveConfiguredGet')
482 print_sanity_results(result, time == OpEN.uint32_tp_value(tmp_p), msg, time)
483 OpEN.delete_uint32_tp(tmp_p)
485 def test_peer_hold_time(self, address, scope_id, time, def_value) :
486 """ Set the configured hold time for a given BGP peer.
487 The time is then read and compared to verify the operation."""
489 result, addr, type = ip_to_open_inet(address)
490 msg = type +
' peer hold time'
492 if addr.family != OpEN.OPEN_AF_NONE:
493 result = OpEN.openapiBgpPeerHoldTimeConfiguredSet(self.m_client,
498 print_bad_result(result,
'openapiBgpPeerHoldTimeConfiguredSet')
500 tmp_p = OpEN.new_uint32_tp()
501 if result == OpEN.OPEN_E_NONE:
502 result = OpEN.openapiBgpPeerHoldTimeConfiguredGet(self.m_client,
503 OpEN.OPEN_BGP_GET_FINAL,
507 print_bad_result(result,
'openapiBgpPeerHoldTimeConfiguredGet')
509 print_sanity_results(result, time == OpEN.uint32_tp_value(tmp_p), msg, time)
510 OpEN.delete_uint32_tp(tmp_p)
512 def test_peer_activate(self, af, address, scope_id, activate, def_value) :
513 """ Configure peer to advertise and accept routes for the given address family."""
515 result, addr, type = ip_to_open_inet(address)
516 msg = type +
' peer activate'
518 if addr.family != OpEN.OPEN_AF_NONE:
519 result = OpEN.openapiBgpPeerActivateSet(self.m_client,
525 print_bad_result(result,
'openapiBgpPeerActivateSet')
527 tmp_p = OpEN.new_bool_tp()
528 if result == OpEN.OPEN_E_NONE:
529 result = OpEN.openapiBgpPeerActivateGet(self.m_client,
530 OpEN.OPEN_BGP_GET_FINAL,
535 print_bad_result(result,
'openapiBgpPeerActivateGet')
537 print_sanity_results(result, activate == OpEN.bool_tp_value(tmp_p), msg, activate)
538 OpEN.delete_bool_tp(tmp_p)
540 def test_peer_advertisement(self, af, address, scope_id, interval, def_value) :
541 """ Set the advertisment interval for a given BGP peer.
542 The interval is then read and compared to verify the operation."""
544 result, addr, type = ip_to_open_inet(address)
545 msg = type +
' peer advertisement interval'
547 if addr.family != OpEN.OPEN_AF_NONE:
548 result = OpEN.openapiBgpPeerAdvertisementIntervalSet(self.m_client,
554 print_bad_result(result,
'openapiBgpPeerAdvertisementIntervalSet')
556 tmp_p = OpEN.new_uint32_tp()
557 if result == OpEN.OPEN_E_NONE:
558 result = OpEN.openapiBgpPeerAdvertisementIntervalGet(self.m_client,
559 OpEN.OPEN_BGP_GET_FINAL,
564 print_bad_result(result,
'openapiBgpPeerActivateGet')
566 print_sanity_results(result, interval == OpEN.uint32_tp_value(tmp_p), msg, interval)
567 OpEN.delete_uint32_tp(tmp_p)
569 def test_peer_next_hop_self(self, af, address, scope_id, enable, def_value) :
570 """ Configure BGP to use a local address when advertising routes to a given internal peer.
571 The flag is then read and compared to verify the operation."""
573 result, addr, type = ip_to_open_inet(address)
574 msg = type +
' peer next hop'
576 if addr.family != OpEN.OPEN_AF_NONE:
577 result = OpEN.openapiBgpPeerNextHopSelfModeSet(self.m_client,
583 print_bad_result(result,
'openapiBgpPeerNextHopSelfModeSet')
585 tmp_p = OpEN.new_OPEN_CONTROL_tp()
586 if result == OpEN.OPEN_E_NONE:
587 result = OpEN.openapiBgpPeerNextHopSelfModeGet(self.m_client,
588 OpEN.OPEN_BGP_GET_FINAL,
593 print_bad_result(result,
'openapiBgpPeerNextHopSelfModeGet')
595 print_sanity_results(result, enable == OpEN.OPEN_CONTROL_tp_value(tmp_p), msg, enable)
596 OpEN.delete_OPEN_CONTROL_tp(tmp_p)
598 def test_peer_pfx_limit(self, af, address, scope_id, threshold, warning_only, def_value) :
599 """ Set the prefix limit configured for a given peer.
600 In this example, the maximum prefix limit is retrieved from the switch based
601 on the IP address family. This max value and the given threshold, and
602 warningOnly flag is set and read back for comparison."""
604 result, addr, type = ip_to_open_inet(address)
605 msg = type +
' peer prefix limit'
608 if addr.family != OpEN.OPEN_AF_NONE:
609 if addr.family == OpEN.OPEN_AF_INET:
610 limit_p = OpEN.new_uint32_tp()
611 OpEN.openapiBgpIpv4RouteMax(self.m_client, limit_p)
613 limit = OpEN.uint32_tp_value(limit_p)
616 limit = OpEN.OPEN_BGP_NBR_MAX_PFX_NOLIMIT
618 result = OpEN.openapiBgpPeerPfxLimitSet(self.m_client,
626 print_bad_result(result,
'openapiBgpPeerPfxLimitSet')
628 limit_p = OpEN.new_uint32_tp()
629 threshold_p = OpEN.new_uint32_tp()
630 warning_p = OpEN.new_bool_tp()
631 if result == OpEN.OPEN_E_NONE:
632 result = OpEN.openapiBgpPeerPfxLimitGet(self.m_client,
633 OpEN.OPEN_BGP_GET_FINAL,
640 print_bad_result(result,
'openapiBgpPeerPfxLimitGet')
642 test = (limit == OpEN.uint32_tp_value(limit_p)
and
643 threshold == OpEN.uint32_tp_value(threshold_p)
and
644 warning_only == OpEN.bool_tp_value(warning_p))
646 print_sanity_results(result, test, msg,
'%d, %d, ' % (limit, threshold) + str(warning_only))
647 OpEN.delete_uint32_tp(limit_p)
648 OpEN.delete_uint32_tp(threshold_p)
649 OpEN.delete_bool_tp(warning_p)
652 def test_peer_update_source(self, address, scope_id, source, def_value) :
653 """ Set the interface whose IP address that BGP uses as the source
654 IP address in packets sent to a given peer. The interface is
655 then read and compared to verify the operation."""
657 result, addr, type = ip_to_open_inet(address)
658 msg = type +
' peer remote source scope %d source %d' % (scope_id, source)
660 if addr.family != OpEN.OPEN_AF_NONE:
661 result = OpEN.openapiBgpPeerUpdateSourceSet(self.m_client,
666 print_bad_result(result,
'openapiBgpPeerUpdateSourceSet')
668 tmp_p = OpEN.new_uint32_tp()
669 if result == OpEN.OPEN_E_NONE:
670 result = OpEN.openapiBgpPeerUpdateSourceGet(self.m_client,
671 OpEN.OPEN_BGP_GET_FINAL,
675 print_bad_result(result,
'openapiBgpPeerUpdateSourceGet')
677 print_sanity_results(result, source == OpEN.uint32_tp_value(tmp_p), msg, address)
678 OpEN.delete_uint32_tp(tmp_p)
681 """Demonstrate OpEN usage for BGP APIs"""
683 print 'Begin Sanity tests...'
684 ret = open_.connect(
"bgp_example")
685 if ret == OpEN.OPEN_E_NONE:
686 open_.getNetworkOSVersion()
687 client = open_.get_client()
688 example = BgpExample(client)
689 print 'Establish BGP and global parameters...'
690 example.test_local_as(1)
691 example.test_local_id(
'10.1.1.1')
692 example.test_local_pref(1234567890)
693 example.test_distance(10, 20, 30)
694 example.test_global_hold_time(65535)
695 example.test_global_keep_alive(65535)
696 example.test_log_neighbor(
True)
697 example.test_max_paths(OpEN.OPEN_AF_INET,
False)
698 example.test_max_paths(OpEN.OPEN_AF_INET,
True)
699 example.test_network(
True,
True,
'20.10.0.0', 24,
'my-map1')
700 example.test_network(
False,
True,
'20.10.0.0', 24,
'my-map1')
703 match_bits |= OpEN.OPEN_OSPF_METRIC_TYPE_INTERNAL
704 match_bits |= OpEN.OPEN_OSPF_METRIC_TYPE_EXT1
705 match_bits |= OpEN.OPEN_OSPF_METRIC_TYPE_EXT2
706 match_bits |= OpEN.OPEN_OSPF_METRIC_TYPE_NSSA_EXT1
707 match_bits |= OpEN.OPEN_OSPF_METRIC_TYPE_NSSA_EXT2
708 example.test_redistribution(
True,
True, match_bits,
True,
True, 4294967295,
False, OpEN.OPEN_AF_INET,
'')
710 print 'Create a ipv4 test peer...'
711 example.test_peer_remote_as(1,
"20.2.2.2", 0)
712 example.test_peer_admin_status(
"20.2.2.2", 0, OpEN.OPEN_BGP_START,
True)
713 example.test_peer_keep_alive(
"20.2.2.2", 0, 600,
True)
714 example.test_peer_hold_time(
"20.2.2.2", 0, 700,
True)
715 example.test_peer_activate(OpEN.OPEN_AF_INET6,
"20.2.2.2", 0,
True,
True)
716 example.test_peer_advertisement(OpEN.OPEN_AF_INET,
"20.2.2.2", 0, 200,
True)
717 example.test_peer_next_hop_self(OpEN.OPEN_AF_INET,
"20.2.2.2", 0, OpEN.OPEN_ENABLE,
True)
718 example.test_peer_pfx_limit(OpEN.OPEN_AF_INET,
"20.2.2.2", 0, 20,
True,
True)
719 print 'Create a ipv6 test peer...'
720 example.test_peer_remote_as(1,
"2222::", 0)
721 example.test_peer_admin_status(
"2222::", 0, OpEN.OPEN_BGP_STOP,
True)
722 example.test_peer_keep_alive(
"2222::", 0, 600,
True)
723 example.test_peer_hold_time(
"2222::", 0, 700,
True)
724 example.test_peer_activate(OpEN.OPEN_AF_INET6,
"2222::", 0,
True,
True)
725 example.test_peer_advertisement(OpEN.OPEN_AF_INET6,
"2222::", 0, 222,
True)
726 example.test_peer_next_hop_self(OpEN.OPEN_AF_INET6,
"2222::", 0, OpEN.OPEN_ENABLE,
True)
727 example.test_peer_pfx_limit(OpEN.OPEN_AF_INET6,
"2222::", 0, 60,
True,
True)
729 result, intf = get_first_routing_interface(client)
730 if result == OpEN.OPEN_E_NONE:
732 example.test_peer_remote_as(1,
"fe80::1", intf)
734 example.test_peer_update_source(
"20.2.2.2", 0, intf,
True)
736 print '\nSanity tests are incomplete because no routing interfaces are available!'
737 print 'Please configure a routing interface and re-run test.'
742 print "Unable to connect"
744 if __name__ ==
'__main__': main()