1 /*! \file acl_example.py
6 """acl_example.py: OpEN API Security Access Lists (ACL) configuration example"""
41 ACTION_PERMIT =
'permit'
44 MATCH_ASSIGN_QUEUE =
'assign-queue'
48 MATCH_ETHER_TYPE =
'ether-type'
50 MATCH_FLOW_LABEL =
'flow-label'
51 MATCH_FRAGMENTS =
'fragments'
52 MATCH_ICMP_TYPE =
'icmp-type'
53 MATCH_IGMP_TYPE =
'igmp-type'
54 MATCH_IP_DST_MASK =
'ip-dst-mask'
55 MATCH_IP_SRC_MASK =
'ip-src-mask'
56 MATCH_IPV6_DST_PFX =
'ipv6-dst-pfx'
57 MATCH_IPV6_SRC_PFX =
'ipv6-src-pfx'
58 MATCH_L4_DST_PORT =
'l4-dst-port'
59 MATCH_L4_SRC_PORT =
'l4-src-port'
60 MATCH_LOGGING =
'logging'
61 MATCH_MAC_DST_MASK =
'mac-dst-mask'
62 MATCH_MAC_SRC_MASK =
'mac-src-mask'
63 MATCH_PACKET_MIRRORING =
'packet-mirroring'
64 MATCH_PACKET_REDIRECTION =
'packet-redirection'
65 MATCH_PRECEDENCE =
'precedence'
66 MATCH_PROTOCOL =
'protocol'
67 MATCH_RATE_LIMIT =
'rate-limit'
68 MATCH_REDIRECT_AGENT =
'redirect-agent'
69 MATCH_ROUTING_FLAG =
'routing-flag'
70 MATCH_TCP_FLAGS =
'tcp-flags'
71 MATCH_TIME_BASED =
'time-based'
80 {ACL:[
'test_ip',OpEN.OPEN_ACL_TYPE_IP]},
81 {RULE:OpEN.OPEN_ACL_PERMIT},
83 {MATCH_ASSIGN_QUEUE:[]},
84 {MATCH_LOGGING:[
True]},
85 {MATCH_PACKET_MIRRORING:[]},
86 {RULE:OpEN.OPEN_ACL_PERMIT},
87 {MATCH_IP_SRC_MASK:[
'10.10.10.1',
'255.0.0.0']},
88 {MATCH_IP_DST_MASK:[
'20.20.20.1',
'255.0.0.0']},
89 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_PIM]},
90 {MATCH_DSCP:[OpEN.OPENAPI_ACL_IP_DSCP_AF11]},
91 {MATCH_FRAGMENTS:[
True]},
92 {MATCH_PRECEDENCE:[7]},
93 {MATCH_TOS:[
'a0',
'a2']},
94 {MATCH_PACKET_REDIRECTION:[]},
95 {RULE:OpEN.OPEN_ACL_PERMIT},
96 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_TCP]},
97 {MATCH_REDIRECT_AGENT:[100]},
98 {MATCH_RATE_LIMIT:[4294967295,128]},
99 {MATCH_TIME_BASED:[
'ipv4-time-range-name']},
100 {RULE:OpEN.OPEN_ACL_PERMIT},
101 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_ICMP]},
102 {MATCH_ICMP_TYPE:[8,0,
True]},
103 {RULE:OpEN.OPEN_ACL_PERMIT},
104 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_IGMP]},
105 {MATCH_IGMP_TYPE:[30]},
106 {RULE:OpEN.OPEN_ACL_PERMIT},
107 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_TCP]},
108 {MATCH_TCP_FLAGS:[OpEN.OPENAPI_ACL_TCP_FLAG_FIN,OpEN.OPENAPI_ACL_TCP_FLAG_SYN,OpEN.OPENAPI_ACL_TCP_FLAG_ACK]},
109 {MATCH_L4_SRC_PORT:[OpEN.OPEN_ACL_L4_PORT_OPERATOR_EQUAL_TO,80,0]},
110 {RULE:OpEN.OPEN_ACL_PERMIT},
111 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_UDP]},
112 {MATCH_L4_SRC_PORT:[OpEN.OPEN_ACL_L4_PORT_OPERATOR_LESS_THAN,10000,0]},
113 {MATCH_L4_DST_PORT:[OpEN.OPEN_ACL_L4_PORT_OPERATOR_GREATER_THAN,20000,0]},
114 {RULE:OpEN.OPEN_ACL_DENY},
116 {ACL:[
'test_ip6',OpEN.OPEN_ACL_TYPE_IPV6]},
117 {RULE:OpEN.OPEN_ACL_PERMIT},
118 {MATCH_EVERY:[
True]},
119 {MATCH_ASSIGN_QUEUE:[3]},
120 {MATCH_LOGGING:[
True]},
121 {MATCH_PACKET_MIRRORING:[]},
122 {RULE:OpEN.OPEN_ACL_PERMIT},
123 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_PIM]},
124 {MATCH_IPV6_SRC_PFX:[
'1001::',24]},
125 {MATCH_IPV6_DST_PFX:[
'2001::',24]},
126 {MATCH_DSCP:[OpEN.OPENAPI_ACL_IP_DSCP_AF11]},
127 {MATCH_FRAGMENTS:[
True]},
128 {MATCH_PACKET_REDIRECTION:[100]},
129 {RULE:OpEN.OPEN_ACL_PERMIT},
130 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_TCP]},
131 {MATCH_REDIRECT_AGENT:[100]},
132 {MATCH_RATE_LIMIT:[4294967295,128]},
133 {MATCH_TIME_BASED:[
'ipv6-time-range-name']},
134 {MATCH_FLOW_LABEL:[1048575]},
135 {MATCH_ROUTING_FLAG:[
True]},
136 {RULE:OpEN.OPEN_ACL_PERMIT},
137 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_ICMPV6]},
138 {MATCH_ICMP_TYPE:[1,4,
True]},
139 {RULE:OpEN.OPEN_ACL_PERMIT},
140 {MATCH_PROTOCOL:[OpEN.OPENAPI_ACL_PROTOCOL_TCP]},
141 {MATCH_TCP_FLAGS:[OpEN.OPENAPI_ACL_TCP_FLAG_RST,OpEN.OPENAPI_ACL_TCP_FLAG_PSH,OpEN.OPENAPI_ACL_TCP_FLAG_URG]},
142 {MATCH_L4_SRC_PORT:[OpEN.OPEN_ACL_L4_PORT_OPERATOR_RANGE,20000,20010]},
143 {MATCH_L4_DST_PORT:[OpEN.OPEN_ACL_L4_PORT_OPERATOR_GREATER_THAN,60000,0]},
144 {RULE:OpEN.OPEN_ACL_DENY},
146 {ACL:[
'test_mac',OpEN.OPEN_ACL_TYPE_MAC]},
147 {RULE:OpEN.OPEN_ACL_PERMIT},
148 {MATCH_EVERY:[
True]},
149 {MATCH_ASSIGN_QUEUE:[5]},
150 {MATCH_LOGGING:[
True]},
151 {MATCH_PACKET_MIRRORING:[]},
152 {RULE:OpEN.OPEN_ACL_PERMIT},
153 {MATCH_MAC_SRC_MASK:[
'00:11:22:33:44:55',
'ff:ff:ff:00:00:00']},
154 {MATCH_MAC_DST_MASK:[
'55:44:33:22:11:00',
'00:00:00:ff:ff:ff']},
155 {MATCH_PACKET_REDIRECTION:[]},
156 {RULE:OpEN.OPEN_ACL_PERMIT},
157 {MATCH_REDIRECT_AGENT:[100]},
158 {MATCH_RATE_LIMIT:[4294967295,128]},
159 {MATCH_TIME_BASED:[
'mac-time-range-name']},
160 {MATCH_ETHER_TYPE:[0x809B]},
163 {MATCH_VLAN:[OpEN.OPEN_ACL_VLAN_OPERATOR_RANGE,2,100]},
164 {MATCH_VLAN2:[OpEN.OPEN_ACL_VLAN_OPERATOR_EQUAL_TO,200,0]},
165 {RULE:OpEN.OPEN_ACL_DENY},
170 (OpEN.OPEN_ACL_TYPE_IP,
'test_ip',
'test_ip_renamed'),
171 (OpEN.OPEN_ACL_TYPE_IPV6,
'test_ip6',
'test_ip6_renamed'),
172 (OpEN.OPEN_ACL_TYPE_MAC,
'test_mac',
'test_mac_renamed'),
176 INTERFACE_CREATE_LIST = [
177 {INTF:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 1, OpEN.OPEN_ACL_INBOUND_ACL, 100]},
178 {INTF:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 2, OpEN.OPEN_ACL_INBOUND_ACL, 200]},
179 {INTF:[OpEN.OPEN_ACL_TYPE_IPV6,
'test_ip6_renamed',3, OpEN.OPEN_ACL_INBOUND_ACL, 300]},
180 {INTF:[OpEN.OPEN_ACL_TYPE_MAC,
'test_mac_renamed',4, OpEN.OPEN_ACL_INBOUND_ACL, 400]},
181 {VLAN:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 5, OpEN.OPEN_ACL_INBOUND_ACL, 500]},
182 {VLAN:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 6, OpEN.OPEN_ACL_INBOUND_ACL, 600]},
183 {VLAN:[OpEN.OPEN_ACL_TYPE_IPV6,
'test_ip6_renamed',7, OpEN.OPEN_ACL_INBOUND_ACL, 700]},
184 {VLAN:[OpEN.OPEN_ACL_TYPE_MAC,
'test_mac_renamed',8, OpEN.OPEN_ACL_INBOUND_ACL, 800]},
186 {INTF:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 9, OpEN.OPEN_ACL_INBOUND_ACL, 900]},
190 INTERFACE_DELETE_LIST = [
191 {INTF:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 2, OpEN.OPEN_ACL_INBOUND_ACL]},
192 {VLAN:[OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', 5, OpEN.OPEN_ACL_INBOUND_ACL]},
196 """Convert ipv4 string to integer"""
198 return struct.unpack(
"!I", socket.inet_aton(addr))[0]
201 """Convert ipv4 integer to string"""
203 return socket.inet_ntoa(struct.pack(
"!I", addr))
205 def ipv6_to_int(addr):
206 """Convert ipv6 string to integer"""
208 str_ = socket.inet_pton(socket.AF_INET6, addr)
209 a, b = struct.unpack(
'!2Q', str_)
212 def int_to_ipv6(addr):
213 """Convert ipv6 integer to string"""
216 b = addr & ((1 << 64) - 1)
217 return socket.inet_ntop(socket.AF_INET6, struct.pack(
'!2Q', a, b))
219 def print_sanity_results(result, test, msg, feat):
220 """Print overall comparision results"""
222 if result == OpEN.OPEN_E_UNAVAIL:
223 print "Sanity test skipped."
224 elif result == OpEN.OPEN_E_NONE
and test ==
True:
225 print "Sanity Success - %s - %s." % (msg, feat)
227 print "Sanity Failure - %s - %s." % (msg, feat)
229 def print_bad_result(result, msg):
230 """Print some general error messages if the result is bad"""
232 if result == OpEN.OPEN_E_UNAVAIL:
233 print "Feature not supported - %s (err %d)." % (msg, result)
234 elif result != OpEN.OPEN_E_NONE:
235 print "Test Failure - %s (err %d)." % (msg, result)
238 """Simple ACL class implementing basic CRUD examples """
240 def __init__(self, client) :
241 self.m_client = client
243 def create_acl(self, acl_name, acl_type, init=None):
244 """Create an ACL whose type is based on acl_type"""
247 name_string = open_.getStringBuffer(len(acl_name) + 1, acl_name)
248 except OpENBufferSizeError:
249 print(
"create_acl: getStringBuffer raised OpENBufferSizeError")
252 print(
"create_acl: getStringBuffer raised TypeError")
254 name_buff = OpEN.open_buffdesc()
255 name_buff.pstart = name_string
256 name_buff.size = len(acl_name)+1
260 OpEN.openapiAclDeleteByName(self.m_client, acl_type, name_buff)
263 acl_p = OpEN.new_uint32_tp()
264 result = OpEN.openapiAclCreate(self.m_client, acl_type, name_buff, acl_p)
265 print_bad_result(result,
'openapiAclCreate')
267 if result == OpEN.OPEN_E_NONE:
269 acl_id = OpEN.uint32_tp_value(acl_p)
270 tmp_acl_p = OpEN.new_uint32_tp()
271 result = OpEN.openapiAclGet(self.m_client, acl_type, name_buff, tmp_acl_p)
272 print_bad_result(result,
'openapiAclGet')
273 print_sanity_results(result, acl_id==OpEN.uint32_tp_value(tmp_acl_p),
'create_acl', acl_name)
274 OpEN.delete_uint32_tp(tmp_acl_p)
276 OpEN.delete_uint32_tp(acl_p)
278 if result == OpEN.OPEN_E_NONE:
283 def create_rule(self, acl_id, rule_id, action):
284 """Create an action rule which is attached to the given acl_id"""
292 rule_p = OpEN.new_uint32_tp()
293 OpEN.uint32_tp_assign(rule_p, rule_id)
294 result = OpEN.openapiAclRuleActionAdd(self.m_client, acl_id, rule_p, action)
295 print_bad_result(result,
'openapiAclRuleActionAdd')
297 if result == OpEN.OPEN_E_NONE:
299 rule_id = OpEN.uint32_tp_value(rule_p)
300 action_p = OpEN.new_OPEN_ACL_ACTION_tp()
301 result = OpEN.openapiAclRuleActionGet(self.m_client, acl_id, rule_id, action_p)
302 print_bad_result(result,
'openapiAclRuleActionGet')
303 print_sanity_results(result, action==OpEN.OPEN_ACL_ACTION_tp_value(action_p),
'create_rule',
"")
304 OpEN.delete_OPEN_ACL_ACTION_tp(action_p)
306 OpEN.delete_uint32_tp(rule_p)
308 if result == OpEN.OPEN_E_NONE:
313 def create_match(self, acl_id, rule_id, match, conditions):
314 """Add a match-condition to the supplied acl and rule"""
316 if acl_id == 0
or rule_id == 0:
319 if match == MATCH_ASSIGN_QUEUE:
320 val_p = OpEN.new_uint32_tp()
322 if len(conditions) == 1:
327 OpEN.openapiAclMaxAssignQueueGet(self.m_client, val_p)
328 val = OpEN.uint32_tp_value(val_p)
330 result = OpEN.openapiAclRuleMatchAssignQueueAdd(self.m_client, acl_id, rule_id, val)
331 print_bad_result(result,
'openapiAclRuleMatchAssignQueueAdd')
333 if result == OpEN.OPEN_E_NONE:
335 val_p = OpEN.new_uint32_tp()
336 result = OpEN.openapiAclRuleMatchAssignQueueGet(self.m_client, acl_id, rule_id, val_p)
337 print_bad_result(result,
'openapiAclRuleMatchAssignQueueGet')
338 print_sanity_results(result, val==OpEN.uint32_tp_value(val_p),
'create_match', match)
340 OpEN.delete_uint32_tp(val_p)
342 elif match == MATCH_COS:
346 result = OpEN.openapiAclRuleMatchMacCosAdd(self.m_client, acl_id, rule_id, val)
347 print_bad_result(result,
'openapiAclRuleMatchMacCosAdd')
349 if result == OpEN.OPEN_E_NONE:
351 val_p = OpEN.new_uint32_tp()
352 result = OpEN.openapiAclRuleMatchMacCosGet(self.m_client, acl_id, rule_id, val_p)
353 print_bad_result(result,
'openapiAclRuleMatchMacCosGet')
354 print_sanity_results(result, val==OpEN.uint32_tp_value(val_p),
'create_match', match)
355 OpEN.delete_uint32_tp(val_p)
357 elif match == MATCH_COS2:
361 result = OpEN.openapiAclRuleMatchMacCos2Add(self.m_client, acl_id, rule_id, val)
362 print_bad_result(result,
'openapiAclRuleMatchMacCos2Add')
364 if result == OpEN.OPEN_E_NONE:
366 val_p = OpEN.new_uint32_tp()
367 result = OpEN.openapiAclRuleMatchMacCos2Get(self.m_client, acl_id, rule_id, val_p)
368 print_bad_result(result,
'openapiAclRuleMatchMacCos2Get')
369 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
370 OpEN.delete_uint32_tp(val_p)
372 elif match == MATCH_DSCP:
375 result = OpEN.openapiAclRuleMatchDscpAdd(self.m_client, acl_id, rule_id, val)
376 print_bad_result(result,
'openapiAclRuleMatchDscpAdd')
378 if result == OpEN.OPEN_E_NONE:
380 val_p = OpEN.new_uint32_tp()
381 result = OpEN.openapiAclRuleMatchDscpGet(self.m_client, acl_id, rule_id, val_p)
382 print_bad_result(result,
'openapiAclRuleMatchDscpGet')
383 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
384 OpEN.delete_uint32_tp(val_p)
386 elif match == MATCH_ETHER_TYPE:
389 result = OpEN.openapiAclRuleMatchMacEtherTypeAdd(self.m_client, acl_id, rule_id, val)
390 print_bad_result(result,
'openapiAclRuleMatchMacEtherTypeAdd')
392 if result == OpEN.OPEN_E_NONE:
394 val_p = OpEN.new_uint32_tp()
395 result = OpEN.openapiAclRuleMatchMacEtherTypeGet(self.m_client, acl_id, rule_id, val_p)
396 print_bad_result(result,
'openapiAclRuleMatchMacEtherTypeGet')
397 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
398 OpEN.delete_uint32_tp(val_p)
400 elif match == MATCH_FLOW_LABEL:
404 result = OpEN.openapiAclRuleMatchFlowLabelAdd(self.m_client, acl_id, rule_id, val)
405 print_bad_result(result,
'openapiAclRuleMatchFlowLabelAdd')
407 if result == OpEN.OPEN_E_NONE:
409 val_p = OpEN.new_uint32_tp()
410 result = OpEN.openapiAclRuleMatchFlowLabelGet(self.m_client, acl_id, rule_id, val_p)
411 print_bad_result(result,
'openapiAclRuleMatchFlowLabelGet')
412 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
413 OpEN.delete_uint32_tp(val_p)
415 elif match == MATCH_FRAGMENTS:
418 result = OpEN.openapiAclRuleMatchFragmentsAdd(self.m_client, acl_id, rule_id, val)
419 print_bad_result(result,
'openapiAclRuleMatchFragmentsAdd')
421 if result == OpEN.OPEN_E_NONE:
423 val_p = OpEN.new_bool_tp()
424 result = OpEN.openapiAclRuleMatchFragmentsGet(self.m_client, acl_id, rule_id, val_p)
425 print_bad_result(result,
'openapiAclRuleMatchFragmentsGet')
426 print_sanity_results(result, val == OpEN.bool_tp_value(val_p),
'create_match', match)
427 OpEN.delete_bool_tp(val_p)
429 elif match == MATCH_EVERY:
432 result = OpEN.openapiAclRuleMatchEveryAdd(self.m_client, acl_id, rule_id, val)
433 print_bad_result(result,
'openapiAclRuleMatchEveryAdd')
435 if result == OpEN.OPEN_E_NONE:
437 val_p = OpEN.new_bool_tp()
438 result = OpEN.openapiAclRuleMatchEveryGet(self.m_client, acl_id, rule_id, val_p)
439 print_bad_result(result,
'openapiAclRuleMatchEveryGet')
440 print_sanity_results(result, val == OpEN.bool_tp_value(val_p),
'create_match', match)
441 OpEN.delete_bool_tp(val_p)
443 elif match == MATCH_ICMP_TYPE:
444 type_ = conditions[0]
446 bool_ = conditions[2]
448 result = OpEN.openapiAclRuleMatchIcmpTypeCodeAdd(self.m_client, acl_id, rule_id, bool_, type_, code)
449 print_bad_result(result,
'openapiAclRuleMatchIcmpTypeCodeAdd')
451 if result == OpEN.OPEN_E_NONE:
453 type_p = OpEN.new_uint32_tp()
454 code_p = OpEN.new_uint32_tp()
455 result = OpEN.openapiAclRuleMatchIcmpTypeCodeGet(self.m_client, acl_id, rule_id, bool_, type_p, code_p)
456 print_bad_result(result,
'openapiAclRuleMatchIcmpTypeCodeGet')
457 tmp_type = OpEN.uint32_tp_value(type_p)
458 tmp_code = OpEN.uint32_tp_value(code_p)
459 print_sanity_results(result, type_ == tmp_type
and code == tmp_code,
'create_match', match)
460 OpEN.delete_uint32_tp(type_p)
461 OpEN.delete_uint32_tp(code_p)
463 elif match == MATCH_IGMP_TYPE:
464 type_ = conditions[0]
466 result = OpEN.openapiAclRuleMatchIgmpTypeAdd(self.m_client, acl_id, rule_id, type_)
467 print_bad_result(result,
'openapiAclRuleMatchIgmpTypeAdd')
469 if result == OpEN.OPEN_E_NONE:
471 type_p = OpEN.new_uint32_tp()
472 result = OpEN.openapiAclRuleMatchIgmpTypeGet(self.m_client, acl_id, rule_id, type_p)
473 print_bad_result(result,
'openapiAclRuleMatchIgmpTypeGet')
474 tmp_type = OpEN.uint32_tp_value(type_p)
475 print_sanity_results(result, type_ == tmp_type,
'create_match', match)
476 OpEN.delete_uint32_tp(type_p)
478 elif match == MATCH_IP_DST_MASK:
479 dst_addr = OpEN.open_inet_addr_t()
480 dst_addr.addr.ipv4 = socket.ntohl(ip_to_int(conditions[0]))
481 dst_addr.family = socket.AF_INET
483 mask_addr = OpEN.open_inet_addr_t()
484 mask_addr.addr.ipv4 = socket.ntohl(ip_to_int(conditions[1]))
485 mask_addr.family = socket.AF_INET
487 result = OpEN.openapiAclRuleMatchIpDstMaskAdd(self.m_client, acl_id, rule_id, dst_addr, mask_addr)
488 print_bad_result(result,
'openapiAclRuleMatchIpDstMaskAdd')
490 if result == OpEN.OPEN_E_NONE:
492 tmp_dst_addr = OpEN.open_inet_addr_t()
493 tmp_dst_mask = OpEN.open_inet_addr_t()
495 result = OpEN.openapiAclRuleMatchIpDstMaskGet(self.m_client, acl_id, rule_id, tmp_dst_addr, tmp_dst_mask)
496 print_bad_result(result,
'openapiAclRuleMatchIpDstMaskGet')
497 print_sanity_results(result, dst_addr.addr.ipv4 == tmp_dst_addr.addr.ipv4,
'create_match', match)
499 elif match == MATCH_IP_SRC_MASK:
500 src_addr = OpEN.open_inet_addr_t()
501 src_addr.addr.ipv4 = socket.ntohl(ip_to_int(conditions[0]))
502 src_addr.family = socket.AF_INET
504 mask_addr = OpEN.open_inet_addr_t()
505 mask_addr.addr.ipv4 = socket.ntohl(ip_to_int(conditions[1]))
506 mask_addr.family = socket.AF_INET
508 result = OpEN.openapiAclRuleMatchIpSrcMaskAdd(self.m_client, acl_id, rule_id, src_addr, mask_addr)
509 print_bad_result(result,
'openapiAclRuleMatchIpSrcMaskAdd')
511 if result == OpEN.OPEN_E_NONE:
513 tmp_src_addr = OpEN.open_inet_addr_t()
514 tmp_src_mask = OpEN.open_inet_addr_t()
516 result = OpEN.openapiAclRuleMatchIpSrcMaskGet(self.m_client, acl_id, rule_id, tmp_src_addr, tmp_src_mask)
517 print_bad_result(result,
'openapiAclRuleMatchIpSrcMaskGet')
518 print_sanity_results(result, src_addr.addr.ipv4 == tmp_src_addr.addr.ipv4,
'create_match', match)
520 elif match == MATCH_IPV6_DST_PFX:
522 addr = OpEN.open_inet_addr_t()
523 addr.addr.ipv6 = ipv6_to_int(conditions[0])
524 addr.family = socket.AF_INET6
528 result = OpEN.openapiAclRuleMatchIpv6DstPfxAdd(self.m_client, acl_id, rule_id, addr, pfx)
529 print_bad_result(result,
'openapiAclRuleMatchIpv6DstPfxAdd')
531 if result == OpEN.OPEN_E_NONE:
533 tmp_addr = OpEN.open_inet_addr_t()
534 tmp_pfx_p = OpEN.new_uint32_tp()
536 result = OpEN.openapiAclRuleMatchIpv6DstPfxGet(self.m_client, acl_id, rule_id, tmp_addr, tmp_pfx_p)
537 print_bad_result(result,
'openapiAclRuleMatchIpv6DstPfxGet')
538 print_sanity_results(result, addr.addr.ipv6 == tmp_addr.addr.ipv6,
'create_match', match)
539 OpEN.delete_uint32_tp(tmp_pfx_p)
541 elif match == MATCH_IPV6_SRC_PFX:
543 addr = OpEN.open_inet_addr_t()
544 addr.addr.ipv6 = ipv6_to_int(conditions[0])
545 addr.family = socket.AF_INET6
549 result = OpEN.openapiAclRuleMatchIpv6SrcPfxAdd(self.m_client, acl_id, rule_id, dst_addr, pfx)
550 print_bad_result(result,
'openapiAclRuleMatchIpv6SrcPfxAdd')
552 if result == OpEN.OPEN_E_NONE:
554 tmp_addr = OpEN.open_inet_addr_t()
555 tmp_pfx_p = OpEN.new_uint32_tp()
557 result = OpEN.openapiAclRuleMatchIpv6SrcPfxGet(self.m_client, acl_id, rule_id, tmp_addr, tmp_pfx_p)
558 print_bad_result(result,
'openapiAclRuleMatchIpv6SrcPfxGet')
559 print_sanity_results(result, addr.addr.ipv6 == tmp_addr.addr.ipv6,
'create_match', match)
560 OpEN.delete_uint32_tp(tmp_pfx_p)
562 elif match == MATCH_L4_DST_PORT:
564 start = conditions[1]
567 result = OpEN.openapiAclRuleMatchL4DstPortAdd(self.m_client, acl_id, rule_id, oper, start, end)
568 print_bad_result(result,
'openapiAclRuleMatchL4DstPortAdd')
570 if result == OpEN.OPEN_E_NONE:
572 oper_p = OpEN.new_OPEN_ACL_L4_PORT_OPERATOR_tp()
573 start_p = OpEN.new_uint32_tp()
574 end_p = OpEN.new_uint32_tp()
575 result = OpEN.openapiAclRuleMatchL4DstPortGet(self.m_client, acl_id, rule_id, oper_p, start_p, end_p)
576 print_bad_result(result,
'openapiAclRuleMatchL4DstPortGet')
577 print_sanity_results(result, oper == OpEN.OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p)
and start == OpEN.uint32_tp_value(start_p),
'create_match', match)
578 OpEN.delete_OPEN_ACL_L4_PORT_OPERATOR_tp(oper_p)
579 OpEN.delete_uint32_tp(start_p)
580 OpEN.delete_uint32_tp(end_p)
582 elif match == MATCH_L4_SRC_PORT:
584 start = conditions[1]
587 result = OpEN.openapiAclRuleMatchL4SrcPortAdd(self.m_client, acl_id, rule_id, oper, start, end)
588 print_bad_result(result,
'openapiAclRuleMatchL4SrcPortAdd')
590 if result == OpEN.OPEN_E_NONE:
592 oper_p = OpEN.new_OPEN_ACL_L4_PORT_OPERATOR_tp()
593 start_p = OpEN.new_uint32_tp()
594 end_p = OpEN.new_uint32_tp()
595 result = OpEN.openapiAclRuleMatchL4SrcPortGet(self.m_client, acl_id, rule_id, oper_p, start_p, end_p)
596 print_bad_result(result,
'openapiAclRuleMatchL4SrcPortGet')
597 print_sanity_results(result, oper == OpEN.OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p)
and start == OpEN.uint32_tp_value(start_p),
'create_match', match)
598 OpEN.delete_OPEN_ACL_L4_PORT_OPERATOR_tp(oper_p)
599 OpEN.delete_uint32_tp(start_p)
600 OpEN.delete_uint32_tp(end_p)
602 elif match == MATCH_LOGGING:
605 result = OpEN.openapiAclRuleMatchLoggingAdd(self.m_client, acl_id, rule_id, val)
606 print_bad_result(result,
'openapiAclRuleMatchLoggingAdd')
608 if result == OpEN.OPEN_E_NONE:
610 val_p = OpEN.new_uint32_tp()
611 result = OpEN.openapiAclRuleMatchLoggingGet(self.m_client, acl_id, rule_id, val_p)
612 print_bad_result(result,
'openapiAclRuleMatchLoggingGet')
613 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
614 OpEN.delete_uint32_tp(val_p)
616 elif match == MATCH_MAC_DST_MASK:
622 mac_array = open_.getStringBuffer(len(mac) + 1, mac)
623 except OpENBufferSizeError:
624 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
627 print(
"create_match: getStringBuffer raised TypeError")
629 mac_buff = OpEN.open_buffdesc()
630 mac_buff.pstart = mac_array
631 mac_buff.size = len(mac)+1
634 mask_array = open_.getStringBuffer(len(mask) + 1, mask)
635 except OpENBufferSizeError:
636 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
639 print(
"create_match: getStringBuffer raised TypeError")
641 mask_buff = OpEN.open_buffdesc()
642 mask_buff.pstart = mask_array
643 mask_buff.size = len(mask)+1
645 result = OpEN.openapiAclRuleMatchMacDstMacAdd(self.m_client, acl_id, rule_id, mac_buff, mask_buff)
646 print_bad_result(result,
'openapiAclRuleMatchMacDstMacAdd')
648 if result == OpEN.OPEN_E_NONE:
650 max_len = OpEN.OPENAPI_L7_MAC_ADDR_STRING_LEN + 1
653 tmp_mac_buff_string = open_.getStringBuffer(max_len)
654 except OpENBufferSizeError:
655 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
658 print(
"create_match: getStringBuffer raised TypeError")
660 tmpmac_buff = OpEN.open_buffdesc()
661 tmpmac_buff.pstart = tmp_mac_buff_string
662 tmpmac_buff.size = max_len
665 tmp_mask_buff_string = open_.getStringBuffer(max_len)
666 except OpENBufferSizeError:
667 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
670 print(
"create_match: getStringBuffer raised TypeError")
672 tmp_mask_buff = OpEN.open_buffdesc()
673 tmp_mask_buff.pstart = tmp_mask_buff_string
674 tmp_mask_buff.size = max_len
676 result = OpEN.openapiAclRuleMatchMacDstMacGet(self.m_client, acl_id, rule_id, tmpmac_buff, tmp_mask_buff)
677 print_bad_result(result,
'openapiAclRuleMatchMacDstMacGet')
678 tmp_mac = tmp_mac_buff_string.cast()
679 tmp_mask = tmp_mask_buff_string.cast()
680 print_sanity_results(result, mac == tmp_mac
and mask == tmp_mask,
'create_match', match)
682 elif match == MATCH_MAC_SRC_MASK:
688 mac_array = open_.getStringBuffer(len(mac) + 1, mac)
689 except OpENBufferSizeError:
690 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
693 print(
"create_match: getStringBuffer raised TypeError")
695 mac_buff = OpEN.open_buffdesc()
696 mac_buff.pstart = mac_array
697 mac_buff.size = len(mac)+1
700 mask_array = open_.getStringBuffer(len(mask) + 1, mask)
701 except OpENBufferSizeError:
702 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
705 print(
"create_match: getStringBuffer raised TypeError")
708 mask_buff = OpEN.open_buffdesc()
709 mask_buff.pstart = mask_array
710 mask_buff.size = len(mask)+1
712 result = OpEN.openapiAclRuleMatchMacSrcMacAdd(self.m_client, acl_id, rule_id, mac_buff, mask_buff)
713 print_bad_result(result,
'openapiAclRuleMatchMacSrcMacAdd')
715 if result == OpEN.OPEN_E_NONE:
717 max_len = OpEN.OPENAPI_L7_MAC_ADDR_STRING_LEN + 1
720 tmp_mac_buff_string = open_.getStringBuffer(max_len)
721 except OpENBufferSizeError:
722 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
725 print(
"create_match: getStringBuffer raised TypeError")
727 tmpmac_buff = OpEN.open_buffdesc()
728 tmpmac_buff.pstart = tmp_mac_buff_string
729 tmpmac_buff.size = max_len
732 tmp_mask_buff_string = open_.getStringBuffer(max_len)
733 except OpENBufferSizeError:
734 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
737 print(
"create_match: getStringBuffer raised TypeError")
739 tmp_mask_buff = OpEN.open_buffdesc()
740 tmp_mask_buff.pstart = tmp_mask_buff_string
741 tmp_mask_buff.size = max_len
743 result = OpEN.openapiAclRuleMatchMacSrcMacGet(self.m_client, acl_id, rule_id, tmpmac_buff, tmp_mask_buff)
744 print_bad_result(result,
'openapiAclRuleMatchMacSrcMacGet')
745 tmp_mac = tmp_mac_buff_string.cast()
746 tmp_mask = tmp_mask_buff_string.cast()
747 print_sanity_results(result, mac == tmp_mac
and mask == tmp_mask,
'create_match', match)
749 elif match == MATCH_PACKET_MIRRORING:
750 if len(conditions) == 1:
755 val_p = OpEN.new_uint32_tp()
756 OpEN.openapiIfFirstGet(self.m_client, OpEN.OPEN_INTF_TYPE_PHY, val_p)
757 val = OpEN.uint32_tp_value(val_p)
758 OpEN.delete_uint32_tp(val_p)
760 result = OpEN.openapiAclRuleMatchMirrorAdd(self.m_client, acl_id, rule_id, val)
761 print_bad_result(result,
'openapiAclRuleMatchMirrorAdd')
763 if result == OpEN.OPEN_E_NONE:
765 val_p = OpEN.new_uint32_tp()
766 result = OpEN.openapiAclRuleMatchMirrorGet(self.m_client, acl_id, rule_id, val_p)
767 print_bad_result(result,
'openapiAclRuleMatchMirrorGet')
768 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
769 OpEN.delete_uint32_tp(val_p)
771 elif match == MATCH_PACKET_REDIRECTION:
773 if len(conditions) == 1:
778 val_p = OpEN.new_uint32_tp()
779 OpEN.openapiIfFirstGet(self.m_client, OpEN.OPEN_INTF_TYPE_PHY, val_p)
780 val = OpEN.uint32_tp_value(val_p)
781 OpEN.delete_uint32_tp(val_p)
783 result = OpEN.openapiAclRuleMatchRedirectAdd(self.m_client, acl_id, rule_id, val)
784 print_bad_result(result,
'openapiAclRuleMatchRedirectAdd')
786 if result == OpEN.OPEN_E_NONE:
788 val_p = OpEN.new_uint32_tp()
789 result = OpEN.openapiAclRuleMatchRedirectGet(self.m_client, acl_id, rule_id, val_p)
790 print_bad_result(result,
'openapiAclRuleMatchRedirectGet')
791 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
792 OpEN.delete_uint32_tp(val_p)
794 elif match == MATCH_PRECEDENCE:
797 result = OpEN.openapiAclRuleMatchPrecedenceAdd(self.m_client, acl_id, rule_id, val)
798 print_bad_result(result,
'openapiAclRuleMatchPrecedenceAdd')
800 if result == OpEN.OPEN_E_NONE:
802 val_p = OpEN.new_uint32_tp()
803 result = OpEN.openapiAclRuleMatchPrecedenceGet(self.m_client, acl_id, rule_id, val_p)
804 print_bad_result(result,
'openapiAclRuleMatchPrecedenceGet')
805 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
806 OpEN.delete_uint32_tp(val_p)
808 elif match == MATCH_PROTOCOL:
811 result = OpEN.openapiAclRuleMatchProtocolAdd(self.m_client, acl_id, rule_id, val)
812 print_bad_result(result,
'openapiAclRuleMatchProtocolAdd')
814 if result == OpEN.OPEN_E_NONE:
816 val_p = OpEN.new_uint32_tp()
817 result = OpEN.openapiAclRuleMatchProtocolGet(self.m_client, acl_id, rule_id, val_p)
818 print_bad_result(result,
'openapiAclRuleMatchProtocolGet')
819 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
820 OpEN.delete_uint32_tp(val_p)
822 elif match == MATCH_RATE_LIMIT:
826 result = OpEN.openapiAclRuleMatchRateLimitAdd(self.m_client, acl_id, rule_id, val1, val2)
827 print_bad_result(result,
'openapiAclRuleMatchRateLimitAdd')
829 if result == OpEN.OPEN_E_NONE:
831 val1_p = OpEN.new_uint32_tp()
832 val2_p = OpEN.new_uint32_tp()
833 result = OpEN.openapiAclRuleMatchRateLimitGet(self.m_client, acl_id, rule_id, val1_p, val2_p)
834 print_bad_result(result,
'openapiAclRuleMatchRateLimitGet')
835 print_sanity_results(result, val1 == OpEN.uint32_tp_value(val1_p)
and val2 == OpEN.uint32_tp_value(val2_p),
'create_match', match)
836 OpEN.delete_uint32_tp(val1_p)
837 OpEN.delete_uint32_tp(val2_p)
839 elif match == MATCH_REDIRECT_AGENT:
842 result = OpEN.openapiAclRuleMatchRedirectAgentAdd(self.m_client, acl_id, rule_id, val)
843 print_bad_result(result,
'openapiAclRuleMatchRedirectAgentAdd')
845 if result == OpEN.OPEN_E_NONE:
847 val_p = OpEN.new_uint32_tp()
848 result = OpEN.openapiAclRuleMatchRedirectAgentGet(self.m_client, acl_id, rule_id, val_p)
849 print_bad_result(result,
'openapiAclRuleMatchRedirectAgentGet')
850 print_sanity_results(result, val == OpEN.uint32_tp_value(val_p),
'create_match', match)
851 OpEN.delete_uint32_tp(val_p)
853 elif match == MATCH_ROUTING_FLAG:
857 result = OpEN.openapiAclRuleMatchRoutingAdd(self.m_client, acl_id, rule_id, val)
858 print_bad_result(result,
'openapiAclRuleMatchRoutingAdd')
860 if result == OpEN.OPEN_E_NONE:
862 val_p = OpEN.new_bool_tp()
863 result = OpEN.openapiAclRuleMatchRoutingGet(self.m_client, acl_id, rule_id, val_p)
864 print_bad_result(result,
'openapiAclRuleMatchRoutingGet')
865 print_sanity_results(result, val == OpEN.bool_tp_value(val_p),
'create_match', match)
866 OpEN.delete_bool_tp(val_p)
868 elif match == MATCH_TCP_FLAGS:
872 for idx
in xrange(0, len(conditions)) :
873 flag_val |= (
True << conditions[idx])
874 flag_mask |= (
True << conditions[idx])
876 result = OpEN.openapiAclRuleMatchTcpFlagsAdd(self.m_client, acl_id, rule_id, flag_val, flag_mask)
877 print_bad_result(result,
'openapiAclRuleMatchTcpFlagsAdd')
879 if result == OpEN.OPEN_E_NONE:
881 flag_val_p = OpEN.new_uint32_tp()
882 flag_mask_p = OpEN.new_uint32_tp()
883 result = OpEN.openapiAclRuleMatchTcpFlagsGet(self.m_client, acl_id, rule_id, flag_val_p, flag_mask_p)
884 print_bad_result(result,
'openapiAclRuleMatchTcpFlagsGet')
885 print_sanity_results(result, flag_val == OpEN.uint32_tp_value(flag_val_p),
'create_match', match)
886 OpEN.delete_uint32_tp(flag_val_p)
887 OpEN.delete_uint32_tp(flag_mask_p)
889 elif match == MATCH_TIME_BASED:
890 time_name = conditions[0]
893 name_array = open_.getStringBuffer(len(time_name) + 1, time_name)
894 except OpENBufferSizeError:
895 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
898 print(
"create_match: getStringBuffer raised TypeError")
900 name_buff = OpEN.open_buffdesc()
901 name_buff.pstart = name_array
902 name_buff.size = len(time_name)+1
904 result = OpEN.openapiAclRuleMatchTimeRangeAdd(self.m_client, acl_id, rule_id, name_buff)
905 print_bad_result(result,
'openapiAclRuleMatchTimeRangeAdd')
907 if result == OpEN.OPEN_E_NONE:
909 max_len = OpEN.OPENAPI_TIME_RANGE_NAME_LEN_MAX + 1
912 tmp_name_string = open_.getStringBuffer(max_len)
913 except OpENBufferSizeError:
914 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
917 print(
"create_match: getStringBuffer raised TypeError")
919 tmp_name_buff = OpEN.open_buffdesc()
920 tmp_name_buff.pstart = tmp_name_string
921 tmp_name_buff.size = max_len
923 result = OpEN.openapiAclRuleMatchTimeRangeGet(self.m_client, acl_id, rule_id, tmp_name_buff)
924 print_bad_result(result,
'openapiAclRuleMatchTimeRangeGet')
925 tmp_time_name = tmp_name_string.cast()
926 print_sanity_results(result, time_name == tmp_time_name,
'create_match', match)
928 if result == OpEN.OPEN_E_NONE:
929 status_p = OpEN.new_OPEN_ACL_RULE_STATUS_tp()
930 result = OpEN.openapiAclRuleMatchTimeRangeStatusGet(self.m_client, acl_id, rule_id, status_p)
931 print_bad_result(result,
'openapiAclRuleMatchTimeRangeStatusGet')
932 print_sanity_results(result,
True,
'create_match', match+
' status')
933 OpEN.delete_OPEN_ACL_RULE_STATUS_tp(status_p)
935 elif match == MATCH_TOS:
936 tos_val = conditions[0]
937 tos_mask = conditions[1]
940 tos_val_array = open_.getStringBuffer(len(tos_val) + 1, tos_val)
941 except OpENBufferSizeError:
942 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
945 print(
"create_match: getStringBuffer raised TypeError")
947 tos_val_buff = OpEN.open_buffdesc()
948 tos_val_buff.pstart = tos_val_array
949 tos_val_buff.size = len(tos_val) + 1
952 tos_mask_array = open_.getStringBuffer(len(tos_mask) + 1, tos_mask)
953 except OpENBufferSizeError:
954 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
957 print(
"create_match: getStringBuffer raised TypeError")
959 tos_mask_buff = OpEN.open_buffdesc()
960 tos_mask_buff.pstart = tos_mask_array
961 tos_mask_buff.size = len(tos_mask) + 1
963 result = OpEN.openapiAclRuleMatchTosAdd(self.m_client, acl_id, rule_id, tos_val_buff, tos_mask_buff)
964 print_bad_result(result,
'openapiAclRuleMatchTosAdd')
966 if result == OpEN.OPEN_E_NONE:
971 tmp_tos_buffstring = open_.getStringBuffer(max_len)
972 except OpENBufferSizeError:
973 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
976 print(
"create_match: getStringBuffer raised TypeError")
978 tmp_tos_buff = OpEN.open_buffdesc()
979 tmp_tos_buff.pstart = tmp_tos_buffstring
980 tmp_tos_buff.size = max_len
983 tmp_mask_buff_string = open_.getStringBuffer(max_len)
984 except OpENBufferSizeError:
985 print(
"create_match: getStringBuffer raised OpENBufferSizeError")
988 print(
"create_match: getStringBuffer raised TypeError")
990 tmp_mask_buff = OpEN.open_buffdesc()
991 tmp_mask_buff.pstart = tmp_mask_buff_string
992 tmp_mask_buff.size = max_len
994 result = OpEN.openapiAclRuleMatchTosGet(self.m_client, acl_id, rule_id, tmp_tos_buff, tmp_mask_buff)
995 print_bad_result(result,
'openapiAclRuleMatchTosGet')
996 tmp_tos_val = tmp_tos_buffstring.cast()
997 tmp_tos_mask = tmp_mask_buff_string.cast()
998 print_sanity_results(result, tos_val == tmp_tos_val
and tos_mask == tmp_tos_mask,
'create_match', match)
1000 elif match == MATCH_VLAN:
1002 oper = conditions[0]
1003 start = conditions[1]
1006 result = OpEN.openapiAclRuleMatchMacVlanAdd(self.m_client, acl_id, rule_id, oper, start, end)
1007 print_bad_result(result,
'openapiAclRuleMatchMacVlanAdd')
1009 if result == OpEN.OPEN_E_NONE:
1011 oper_p = OpEN.new_OPEN_ACL_VLAN_OPERATOR_tp()
1012 start_p = OpEN.new_uint32_tp()
1013 end_p = OpEN.new_uint32_tp()
1014 result = OpEN.openapiAclRuleMatchMacVlanGet(self.m_client, acl_id, rule_id, oper_p, start_p, end_p)
1015 print_bad_result(result,
'openapiAclRuleMatchMacVlanGet')
1016 print_sanity_results(result, oper == OpEN.OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p)
and start == OpEN.uint32_tp_value(start_p),
'create_match', match)
1017 OpEN.delete_OPEN_ACL_VLAN_OPERATOR_tp(oper_p)
1018 OpEN.delete_uint32_tp(start_p)
1019 OpEN.delete_uint32_tp(end_p)
1021 elif match == MATCH_VLAN2:
1023 oper = conditions[0]
1024 start = conditions[1]
1027 result = OpEN.openapiAclRuleMatchMacSecondaryVlanAdd(self.m_client, acl_id, rule_id, oper, start, end)
1028 print_bad_result(result,
'openapiAclRuleMatchMacSecondaryVlanAdd')
1030 if result == OpEN.OPEN_E_NONE:
1032 oper_p = OpEN.new_OPEN_ACL_VLAN_OPERATOR_tp()
1033 start_p = OpEN.new_uint32_tp()
1034 end_p = OpEN.new_uint32_tp()
1035 result = OpEN.openapiAclRuleMatchMacSecondaryVlanGet(self.m_client, acl_id, rule_id, oper_p, start_p, end_p)
1036 print_bad_result(result,
'openapiAclRuleMatchMacSecondaryVlanGet')
1037 print_sanity_results(result, oper == OpEN.OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p)
and start == OpEN.uint32_tp_value(start_p),
'create_match', match)
1038 OpEN.delete_OPEN_ACL_VLAN_OPERATOR_tp(oper_p)
1039 OpEN.delete_uint32_tp(start_p)
1040 OpEN.delete_uint32_tp(end_p)
1042 def test_acl_creation(self, acls):
1043 """Parse through the acls list and call acl functions based on keys"""
1048 for key, val
in acl.items():
1050 acl_id = self.create_acl(val[0], val[1],
True)
1052 rule_id = self.create_rule(acl_id, rule_id, val)
1054 self.create_match(acl_id, rule_id, key, val)
1056 def test_acl_retrieval(self):
1057 """Perform some generic ACL retrieval and iteration tests."""
1058 acl_p = OpEN.new_uint32_tp()
1059 acl_type = OpEN.OPEN_ACL_TYPE_IP
1062 result = OpEN.openapiAclGetFirst(self.m_client, acl_type, acl_p)
1063 print_bad_result(result,
"openapiAclGetFirst - IP ACL")
1064 print_sanity_results(result, (OpEN.uint32_tp_value(acl_p)>0),
"openapiAclGetFirst",
"IP ACL")
1067 tmp_acl = self.create_acl(
"another_test_ip", acl_type,
True)
1072 while OpEN.openapiAclGetNext(self.m_client, acl_type, tmp_acl, acl_p) == OpEN.OPEN_E_NONE:
1073 tmp_acl = OpEN.uint32_tp_value(acl_p)
1074 print_sanity_results(OpEN.OPEN_E_NONE, (tmp_acl>0),
"openapiAclGetNext",
"temporary")
1078 result = OpEN.openapiAclDelete(self.m_client, tmp_acl)
1079 print_sanity_results(result,
True,
"openapiAclDelete",
"temporary")
1081 OpEN.delete_uint32_tp(acl_p)
1083 def test_acl_rename(self, acls):
1084 """Rename an ACL and verify."""
1091 acl_p = OpEN.new_uint32_tp()
1094 name_string = open_.getStringBuffer(len(name) + 1, name)
1095 except OpENBufferSizeError:
1096 print(
"test_acl_rename: getStringBuffer raised OpENBufferSizeError")
1099 print(
"test_acl_rename: getStringBuffer raised TypeError")
1101 name_buff = OpEN.open_buffdesc()
1102 name_buff.pstart = name_string
1103 name_buff.size = len(name)+1
1106 name_get_string = open_.getStringBuffer(OpEN.OPENAPI_ACL_NAME_LEN_MAX + 1)
1107 except OpENBufferSizeError:
1108 print(
"test_acl_rename: getStringBuffer raised OpENBufferSizeError")
1111 print(
"test_acl_rename: getStringBuffer raised TypeError")
1113 name_get_buff = OpEN.open_buffdesc()
1114 name_get_buff.pstart = name_get_string
1115 name_get_buff.size = OpEN.OPENAPI_ACL_NAME_LEN_MAX + 1
1118 new_name_string = open_.getStringBuffer(len(new_name) + 1, new_name)
1119 except OpENBufferSizeError:
1120 print(
"test_acl_rename: getStringBuffer raised OpENBufferSizeError")
1123 print(
"test_acl_rename: getStringBuffer raised TypeError")
1125 newname_buff = OpEN.open_buffdesc()
1126 newname_buff.pstart = new_name_string
1127 newname_buff.size = len(new_name)+1
1129 result = OpEN.openapiAclGet(self.m_client, acl_type, name_buff, acl_p)
1130 tmp_acl = OpEN.uint32_tp_value(acl_p)
1131 print_bad_result(result,
"openapiAclGet - IP ACL")
1133 if (result == OpEN.OPEN_E_NONE):
1134 result = OpEN.openapiAclNameGet(self.m_client, tmp_acl, name_get_buff)
1135 print_bad_result(result,
"openapiAclNameGet - IP ACL")
1137 if (result == OpEN.OPEN_E_NONE):
1138 result = OpEN.openapiAclRename(self.m_client, tmp_acl, newname_buff)
1139 print_bad_result(result,
"openapiAclRename - IP ACL")
1140 print_sanity_results(result, (tmp_acl>0),
"openapiAclRename", new_name_string.cast())
1142 OpEN.delete_uint32_tp(acl_p)
1144 def test_acl_traffic_filter_create(self, interfaces):
1145 """Parse through the interfaces list and call traffic filter functions based on keys"""
1147 for interface
in interfaces:
1148 for key, val
in interface.items():
1149 acl_p = OpEN.new_uint32_tp()
1158 name_string = open_.getStringBuffer(len(name) + 1, name)
1159 except OpENBufferSizeError:
1160 print(
"test_acl_traffic_filter_create: getStringBuffer raised OpENBufferSizeError")
1163 print(
"test_acl_traffic_filter_create: getStringBuffer raised TypeError")
1165 name_buff = OpEN.open_buffdesc()
1166 name_buff.pstart = name_string
1167 name_buff.size = len(name)+1
1169 result = OpEN.openapiAclGet(self.m_client, type_, name_buff, acl_p)
1170 print_bad_result(result,
"openapiAclGet - IP ACL")
1172 if result == OpEN.OPEN_E_NONE:
1173 acl = OpEN.uint32_tp_value(acl_p)
1175 result = OpEN.openapiAclIntfDirAdd(self.m_client, intf, dir_, acl, seq)
1176 disp =
"ACL:%s INTF:%d dir:%d seq:%d" % (name_string.cast(), intf, dir_, seq)
1177 print_sanity_results(result,
True,
"openapiAclIntfDirAdd", disp)
1179 result = OpEN.openapiAclVlanDirAdd(self.m_client, intf, dir_, acl, seq)
1180 disp =
"ACL:%s VLAN:%d dir:%d seq:%d" % (name_string.cast(), intf, dir_, seq)
1181 print_sanity_results(result,
True,
"openapiAclVlanDirAdd", disp)
1183 OpEN.delete_uint32_tp(acl_p)
1185 def test_acl_traffic_filter_delete(self, interfaces):
1186 """Parse through the interfaces list and call traffic filter functions based on keys"""
1188 for interface
in interfaces:
1189 for key, val
in interface.items():
1190 acl_p = OpEN.new_uint32_tp()
1198 name_string = open_.getStringBuffer(len(name) + 1, name)
1199 except OpENBufferSizeError:
1200 print(
"test_acl_traffic_filter_create: getStringBuffer raised OpENBufferSizeError")
1203 print(
"test_acl_traffic_filter_create: getStringBuffer raised TypeError")
1205 name_buff = OpEN.open_buffdesc()
1206 name_buff.pstart = name_string
1207 name_buff.size = len(name)+1
1209 result = OpEN.openapiAclGet(self.m_client, type_, name_buff, acl_p)
1210 print_bad_result(result,
"openapiAclGet - IP ACL")
1212 if result == OpEN.OPEN_E_NONE:
1213 acl = OpEN.uint32_tp_value(acl_p)
1215 result = OpEN.openapiAclIntfDirDelete(self.m_client, intf, dir_, acl)
1216 disp =
"ACL:%s INTF:%d dir:%d" % (name_string.cast(), intf, dir_)
1217 print_sanity_results(result,
True,
"openapiAclIntfDirDelete", disp)
1219 result = OpEN.openapiAclVlanDirDelete(self.m_client, intf, dir_, acl)
1220 disp =
"ACL:%s VLAN:%d dir:%d" % (name_string.cast(), intf, dir_)
1221 print_sanity_results(result,
True,
"openapiAclVlanDirDelete", disp)
1223 OpEN.delete_uint32_tp(acl_p)
1225 def test_acl_route_filter(self, acl_type, name):
1226 """Apply an ACL as a route filter."""
1229 name_string = open_.getStringBuffer(len(name) + 1, name)
1230 except OpENBufferSizeError:
1231 print(
"test_acl_route_filter: getStringBuffer raised OpENBufferSizeError")
1234 print(
"test_acl_route_filter: getStringBuffer raised TypeError")
1236 name_buff = OpEN.open_buffdesc()
1237 name_buff.pstart = name_string
1238 name_buff.size = len(name)+1
1240 acl_p = OpEN.new_uint32_tp()
1242 result = OpEN.openapiAclGet(self.m_client, acl_type, name_buff, acl_p)
1243 print_bad_result(result,
"openapiAclGet - IP ACL")
1245 if result == OpEN.OPEN_E_NONE:
1246 acl = OpEN.uint32_tp_value(acl_p)
1247 action_p = OpEN.new_OPEN_ACL_ACTION_tp()
1248 action_p = OpEN.copy_OPEN_ACL_ACTION_tp(OpEN.OPEN_ACL_DENY)
1250 routePrefix = socket.ntohl(ip_to_int(
'10.10.10.1'))
1251 routeMask = socket.ntohl(ip_to_int(
'255.0.0.0'))
1253 result = OpEN.openapiAclRouteFilter(self.m_client, acl, routePrefix, routeMask, action_p);
1254 print_sanity_results(result, OpEN.OPEN_ACL_PERMIT == OpEN.OPEN_ACL_ACTION_tp_value(action_p),
"openapiAclRouteFilter",
"permit")
1256 OpEN.delete_uint32_tp(acl_p)
1257 OpEN.delete_OPEN_ACL_ACTION_tp(action_p)
1259 def show_interface_summary(self, dir_):
1260 """Iterate and display interfaces that are inuse with matching direction"""
1262 print "\nACL interface direction assignments"
1263 print "Interface Direction"
1264 print "--------- ---------"
1267 intf_p = OpEN.new_uint32_tp()
1268 dir_p = OpEN.new_OPEN_ACL_DIRECTION_tp()
1269 inuse_p = OpEN.new_OPEN_CONTROL_tp()
1271 while OpEN.openapiAclIntfDirGetNext(self.m_client, intf, dir_, intf_p, dir_p) == OpEN.OPEN_E_NONE:
1273 if OpEN.openapiAclIsIntfInUse(self.m_client, intf, dir_, inuse_p) == OpEN.OPEN_E_NONE:
1274 if OpEN.OPEN_CONTROL_tp_value(inuse_p) == OpEN.OPEN_ENABLE:
1275 print '%9d ' % intf,
'Inbound' if dir_ == OpEN.OPEN_ACL_INBOUND_ACL
else 'Outbound'
1276 dir_ = OpEN.OPEN_ACL_DIRECTION_tp_value(dir_p)
1277 intf = OpEN.uint32_tp_value(intf_p)
1279 OpEN.delete_uint32_tp(intf_p)
1280 OpEN.delete_OPEN_ACL_DIRECTION_tp(dir_p)
1281 OpEN.delete_OPEN_CONTROL_tp(inuse_p)
1283 def show_interface_details(self, type_, intf, dir_):
1284 """Retrieve a list of ACLs for the given interface and direction and display its contents"""
1287 list_info = OpEN.OPEN_ACL_VLAN_DIR_LIST_t()
1288 result = OpEN.openapiAclVlanDirListGet(self.m_client, intf, dir_, list_info)
1289 print_bad_result(result,
"openapiAclVlanDirListGet")
1290 print '\nACL VLAN %d, %s details' % (intf,
'Inbound' if dir_ == OpEN.OPEN_ACL_INBOUND_ACL
else 'Outbound')
1291 print 'ACL id Type Seq Num'
1292 print '------ ---- -------'
1293 for idx
in range(list_info.count):
1294 info = OpEN.OPEN_ACL_VLAN_LIST_INFO_tArray_getitem(list_info.listEntry, idx)
1295 print '%6d %4d %7d' % (info.aclId, info.aclType, info.seqNum)
1297 list_info = OpEN.OPEN_ACL_INTF_DIR_LIST_t()
1298 result = OpEN.openapiAclIntfDirListGet(self.m_client, intf, dir_, list_info)
1299 print_bad_result(result,
"openapiAclIntfDirListGet")
1300 print '\nACL Interface %d, %s details' % (intf,
'Inbound' if dir_ == OpEN.OPEN_ACL_INBOUND_ACL
else 'Outbound')
1301 print 'ACL id Type Seq Num'
1302 print '------ ---- -------'
1303 for idx
in range(list_info.count):
1304 info = OpEN.OPEN_ACL_INTF_LIST_INFO_tArray_getitem(list_info.listEntry, idx)
1305 print '%6d %4d %7d' % (info.aclId, info.aclType, info.seqNum)
1307 def show_acl_details(self, interface_type, acl_type, name, dir_):
1308 """Retrieve a list of ACLs for the given acl name and direction and display its contents"""
1311 name_string = open_.getStringBuffer(len(name) + 1, name)
1312 except OpENBufferSizeError:
1313 print(
"show_acl_details: getStringBuffer raised OpENBufferSizeError")
1316 print(
"show_acl_details: getStringBuffer raised TypeError")
1318 name_buff = OpEN.open_buffdesc()
1319 name_buff.pstart = name_string
1320 name_buff.size = len(name)+1
1322 acl_p = OpEN.new_uint32_tp()
1324 result = OpEN.openapiAclGet(self.m_client, acl_type, name_buff, acl_p)
1325 acl = OpEN.uint32_tp_value(acl_p)
1326 print_bad_result(result,
"openapiAclGet - IP ACL")
1328 if interface_type == VLAN:
1329 list_info = OpEN.OPEN_ACL_ASSIGNED_VLAN_LIST_t()
1330 result = OpEN.openapiAclAssignedVlanDirListGet(self.m_client, acl, dir_, list_info)
1331 print_bad_result(result,
"openapiAclAssignedVlanDirListGet")
1332 print '\nACL %s, %s VLANs' % (name_string.cast(),
'Inbound' if dir_ == OpEN.OPEN_ACL_INBOUND_ACL
else 'Outbound')
1333 if result == OpEN.OPEN_E_NONE:
1336 for idx
in range(list_info.count):
1337 print OpEN.uint32_tArray_getitem(list_info.vlan, idx)
1339 list_info = OpEN.OPEN_ACL_ASSIGNED_INTF_LIST_t()
1340 result = OpEN.openapiAclAssignedIntfDirListGet(self.m_client, acl, dir_, list_info)
1341 print_bad_result(result,
"openapiAclAssignedIntfDirListGet")
1342 print '\nACL %s, %s Interfaces' % (name_string.cast(),
'Inbound' if dir_ == OpEN.OPEN_ACL_INBOUND_ACL
else 'Outbound')
1343 if result == OpEN.OPEN_E_NONE:
1346 for idx
in range(list_info.count):
1347 print OpEN.uint32_tArray_getitem(list_info.intf, idx)
1349 OpEN.delete_uint32_tp(acl_p)
1351 def show_summary(self):
1352 """Retrieve and display miscellaneous info"""
1354 val_p = OpEN.new_uint32_tp()
1358 result = OpEN.openapiCpuIntfGet(self.m_client, val_p)
1359 print_bad_result(result,
"openapiCpuIntfGet")
1360 string =
"CPU Control Plane Interface ... %d" % OpEN.uint32_tp_value(val_p)
1361 print_sanity_results(result, (1 == 1),
"openapiCpuIntfGet", string)
1363 result = OpEN.openapiAclMaxAclIntfCountGet(self.m_client, val_p)
1364 print_bad_result(result,
"openapiAclMaxAclIntfCountGet")
1365 string =
"Number of allowed ACL Interfaces ... %d" % OpEN.uint32_tp_value(val_p)
1366 print_sanity_results(result, (1 == 1),
"openapiAclMaxAclIntfCountGet", string)
1368 result = OpEN.openapiAclMaxAclVlanCountGet(self.m_client, val_p)
1369 print_bad_result(result,
"openapiAclMaxAclVlanCountGet")
1370 string =
"Number of allowed ACL VLANs ... %d" % OpEN.uint32_tp_value(val_p)
1371 print_sanity_results(result, (1 == 1),
"openapiAclMaxAclVlanCountGet", string)
1373 result = OpEN.openapiAclCountGet(self.m_client, val_p)
1374 print_bad_result(result,
"openapiAclCountGet")
1375 string =
"Total number of configured ACLs ... %d" % OpEN.uint32_tp_value(val_p)
1376 print_sanity_results(result, (1 == 1),
"openapiAclCountGet", string)
1378 result = OpEN.openapiAclMacCountGet(self.m_client, val_p)
1379 print_bad_result(result,
"openapiAclMacCountGet")
1380 string =
"Number of configured MAC ACLs ... %d" % OpEN.uint32_tp_value(val_p)
1381 print_sanity_results(result, (1 == 1),
"openapiAclMacCountGet", string)
1383 OpEN.delete_uint32_tp(val_p)
1386 """Demonstrate OpEN usage for ACL APIs"""
1388 ret = open_.connect(
"acl_example")
1389 if ret == OpEN.OPEN_E_NONE :
1390 open_.getNetworkOSVersion()
1391 client = open_.get_client()
1392 example = AclExample(client)
1393 example.test_acl_creation(ACL_LIST)
1394 example.test_acl_retrieval()
1395 example.test_acl_rename(ACL_RENAME_LIST)
1396 example.test_acl_traffic_filter_create(INTERFACE_CREATE_LIST)
1397 example.test_acl_traffic_filter_delete(INTERFACE_DELETE_LIST)
1398 example.test_acl_route_filter(OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed')
1399 example.show_interface_summary(OpEN.OPEN_ACL_INBOUND_ACL)
1400 example.show_interface_details(INTF, 1, OpEN.OPEN_ACL_INBOUND_ACL)
1401 example.show_interface_details(VLAN, 6, OpEN.OPEN_ACL_INBOUND_ACL)
1402 example.show_acl_details(INTF, OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', OpEN.OPEN_ACL_INBOUND_ACL)
1403 example.show_acl_details(VLAN, OpEN.OPEN_ACL_TYPE_IP,
'test_ip_renamed', OpEN.OPEN_ACL_INBOUND_ACL)
1404 example.show_summary()
1407 print "Unable to connect"
1409 if __name__ ==
'__main__': main()