Open Ethernet Networking (OpEN) API Guide and Reference Manual  3.6.0.3
acl_example.rb
/*! \file acl_example.rb
*/
#!/mnt/fastpath/usr/bin/ruby
require "OpEN"
require "OpENUtil"
require "socket"
require "ipaddr"
#
# Copyright 2016 Broadcom.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Ruby 1.8.7.
#
$open = OpENUtil.new()
# Define some constants for comparison convenience
ACL = 'acl'
INTF = 'intf'
VLAN = 'vlan'
RULE = 'rule'
ACTION_PERMIT = 'permit'
ACTION_DENY = 'deny'
MATCH_ASSIGN_QUEUE = 'assign-queue'
MATCH_COS = 'cos'
MATCH_COS2 = 'cos2'
MATCH_DSCP = 'dscp'
MATCH_ETHER_TYPE = 'ether-type'
MATCH_EVERY = 'every'
MATCH_FLOW_LABEL = 'flow-label'
MATCH_FRAGMENTS = 'fragments'
MATCH_ICMP_TYPE = 'icmp-type'
MATCH_IGMP_TYPE = 'igmp-type'
MATCH_IP_DST_MASK = 'ip-dst-mask'
MATCH_IP_SRC_MASK = 'ip-src-mask'
MATCH_IPV6_DST_PFX = 'ipv6-dst-pfx'
MATCH_IPV6_SRC_PFX = 'ipv6-src-pfx'
MATCH_L4_DST_PORT = 'l4-dst-port'
MATCH_L4_SRC_PORT = 'l4-src-port'
MATCH_LOGGING = 'logging'
MATCH_MAC_DST_MASK = 'mac-dst-mask'
MATCH_MAC_SRC_MASK = 'mac-src-mask'
MATCH_PACKET_MIRRORING = 'packet-mirroring'
MATCH_PACKET_REDIRECTION = 'packet-redirection'
MATCH_PRECEDENCE = 'precedence'
MATCH_PROTOCOL = 'protocol'
MATCH_RATE_LIMIT = 'rate-limit'
MATCH_REDIRECT_AGENT = 'redirect-agent'
MATCH_ROUTING_FLAG = 'routing-flag'
MATCH_TCP_FLAGS = 'tcp-flags'
MATCH_TIME_BASED = 'time-based'
MATCH_TOS = 'tos'
MATCH_VLAN = 'vlan'
MATCH_VLAN2 = 'vlan2'
# Define ACL data for consumption.
# Note that the data is organized in an ACL, RULE, and MATCH sequence.
# This is the necessary sequence in order for creating an ACL
ACL_LIST = [
[ACL, ['test_ip', OpEN::OPEN_ACL_TYPE_IP]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_EVERY, [true]],
[MATCH_ASSIGN_QUEUE, []],
[MATCH_LOGGING, [true]],
[MATCH_PACKET_MIRRORING, []],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_IP_SRC_MASK, ['10.10.10.1','255.0.0.0']],
[MATCH_IP_DST_MASK, ['20.20.20.1','255.0.0.0']],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_PIM]],
[MATCH_DSCP, [OpEN::OPENAPI_ACL_IP_DSCP_AF11]],
[MATCH_FRAGMENTS, [true]],
[MATCH_PRECEDENCE, [7]],
[MATCH_TOS, ['a0','a2']],
[MATCH_PACKET_REDIRECTION, []],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_TCP]],
[MATCH_REDIRECT_AGENT, [100]],
[MATCH_RATE_LIMIT, [4294967295,128]],
[MATCH_TIME_BASED, ['ipv4-time-range-name']],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_ICMP]],
[MATCH_ICMP_TYPE, [8,0,true]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_IGMP]],
[MATCH_IGMP_TYPE, [30]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_TCP]],
[MATCH_TCP_FLAGS, [OpEN::OPENAPI_ACL_TCP_FLAG_FIN,OpEN::OPENAPI_ACL_TCP_FLAG_SYN,OpEN::OPENAPI_ACL_TCP_FLAG_ACK]],
[MATCH_L4_SRC_PORT, [OpEN::OPEN_ACL_L4_PORT_OPERATOR_EQUAL_TO,80,0]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_UDP]],
[MATCH_L4_SRC_PORT, [OpEN::OPEN_ACL_L4_PORT_OPERATOR_LESS_THAN,10000,0]],
[MATCH_L4_DST_PORT, [OpEN::OPEN_ACL_L4_PORT_OPERATOR_GREATER_THAN,20000,0]],
[RULE, [OpEN::OPEN_ACL_DENY]],
[ACL, ['test_ip6', OpEN::OPEN_ACL_TYPE_IPV6]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_EVERY, [true]],
[MATCH_ASSIGN_QUEUE, [3]],
[MATCH_LOGGING, [true]],
[MATCH_PACKET_MIRRORING, []],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_PIM]],
[MATCH_IPV6_SRC_PFX, ['1001::',24]],
[MATCH_IPV6_DST_PFX, ['2001::',24]],
[MATCH_DSCP, [OpEN::OPENAPI_ACL_IP_DSCP_AF11]],
[MATCH_FRAGMENTS, [true]],
[MATCH_PACKET_REDIRECTION, [100]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_TCP]],
[MATCH_REDIRECT_AGENT, [100]],
[MATCH_RATE_LIMIT, [4294967295,128]],
[MATCH_TIME_BASED, ['ipv6-time-range-name']],
[MATCH_FLOW_LABEL, [1048575]],
[MATCH_ROUTING_FLAG, [true]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_ICMPV6]],
[MATCH_ICMP_TYPE, [1,4,true]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_PROTOCOL, [OpEN::OPENAPI_ACL_PROTOCOL_TCP]],
[MATCH_TCP_FLAGS, [OpEN::OPENAPI_ACL_TCP_FLAG_RST,OpEN::OPENAPI_ACL_TCP_FLAG_PSH,OpEN::OPENAPI_ACL_TCP_FLAG_URG]],
[MATCH_L4_SRC_PORT, [OpEN::OPEN_ACL_L4_PORT_OPERATOR_RANGE,20000,20010]],
[MATCH_L4_DST_PORT, [OpEN::OPEN_ACL_L4_PORT_OPERATOR_GREATER_THAN,60000,0]],
[RULE, [OpEN::OPEN_ACL_DENY]],
[ACL, ['test_mac', OpEN::OPEN_ACL_TYPE_MAC]],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_EVERY, [true]],
[MATCH_ASSIGN_QUEUE, [5]],
[MATCH_LOGGING, [true]],
[MATCH_PACKET_MIRRORING, []],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_MAC_SRC_MASK, ['00:11:22:33:44:55','ff:ff:ff:00:00:00']],
[MATCH_MAC_DST_MASK, ['55:44:33:22:11:00','00:00:00:ff:ff:ff']],
[MATCH_PACKET_REDIRECTION, []],
[RULE, [OpEN::OPEN_ACL_PERMIT]],
[MATCH_REDIRECT_AGENT, [100]],
[MATCH_RATE_LIMIT, [4294967295,128]],
[MATCH_TIME_BASED, ['mac-time-range-name']],
[MATCH_ETHER_TYPE, [0x809B]],
[MATCH_COS, [0]],
[MATCH_COS2, [7]],
[MATCH_VLAN, [OpEN::OPEN_ACL_VLAN_OPERATOR_RANGE,2,100]],
[MATCH_VLAN2, [OpEN::OPEN_ACL_VLAN_OPERATOR_EQUAL_TO,200,0]],
[RULE, [OpEN::OPEN_ACL_DENY]]
]
# Define some ACLs to rename.
ACL_RENAME_LIST = [
[OpEN::OPEN_ACL_TYPE_IP, 'test_ip', 'test_ip_renamed'],
[OpEN::OPEN_ACL_TYPE_IPV6, 'test_ip6', 'test_ip6_renamed'],
[OpEN::OPEN_ACL_TYPE_MAC, 'test_mac', 'test_mac_renamed']
]
# Define some interfaces for creation.
INTERFACE_CREATE_LIST = [
[INTF, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 1, OpEN::OPEN_ACL_INBOUND_ACL, 100]],
[INTF, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 2, OpEN::OPEN_ACL_INBOUND_ACL, 200]],
[INTF, [OpEN::OPEN_ACL_TYPE_IPV6,'test_ip6_renamed',3, OpEN::OPEN_ACL_INBOUND_ACL, 300]],
[INTF, [OpEN::OPEN_ACL_TYPE_MAC, 'test_mac_renamed',4, OpEN::OPEN_ACL_INBOUND_ACL, 400]],
[VLAN, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 5, OpEN::OPEN_ACL_INBOUND_ACL, 500]],
[VLAN, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 6, OpEN::OPEN_ACL_INBOUND_ACL, 600]],
[VLAN, [OpEN::OPEN_ACL_TYPE_IPV6,'test_ip6_renamed',7, OpEN::OPEN_ACL_INBOUND_ACL, 700]],
[VLAN, [OpEN::OPEN_ACL_TYPE_MAC, 'test_mac_renamed',8, OpEN::OPEN_ACL_INBOUND_ACL, 800]],
[INTF, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 9, OpEN::OPEN_ACL_INBOUND_ACL, 900]]
]
# Define some interfaces for deletion.
INTERFACE_DELETE_LIST = [
[INTF, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 2, OpEN::OPEN_ACL_INBOUND_ACL]],
[VLAN, [OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', 5, OpEN::OPEN_ACL_INBOUND_ACL]]
]
def print_sanity_results(result, test, msg, feat)
#Print overall comparision results
if result == OpEN::OPEN_E_UNAVAIL
puts "Sanity test skipped."
elsif result == OpEN::OPEN_E_NONE and test == true
puts "Sanity Success - #{msg} - #{feat}."
else
puts "Sanity Failure - #{msg} - #{feat}."
end
end
def print_bad_result(result, msg)
#Print some general error messages if the result is bad
if result == OpEN::OPEN_E_UNAVAIL
puts "Feature not supported - #{msg} (err #{result})."
elsif result != OpEN::OPEN_E_NONE
puts "Test Failure - #{msg} (err #{result})."
end
end
class AclExample
#Simple ACL class implementing basic CRUD examples
def initialize(client)
@client = client
end
def create_acl(acl_name, acl_type, init=nil)
#Create an ACL whose type is based on acl_type
name_string = $open.getCharBuffer(acl_name.length, acl_name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_string
name_buff.size = acl_name.length+1
# Delete previous ACL
if init
OpEN.openapiAclDeleteByName(@client, acl_type, name_buff)
end
# Create new ACL
acl_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclCreate(@client, acl_type, name_buff, acl_p)
print_bad_result(result, 'openapiAclCreate')
if result == OpEN::OPEN_E_NONE:
# Fetch newly created ACL for demonstration
acl_id = OpEN.uint32_tp_value(acl_p)
tmp_acl_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclGet(@client, acl_type, name_buff, tmp_acl_p)
print_bad_result(result, 'openapiAclGet')
print_sanity_results(result, acl_id==OpEN.uint32_tp_value(tmp_acl_p), 'create_acl', acl_name)
OpEN.delete_uint32_tp(tmp_acl_p)
end
OpEN.delete_uint32_tp(acl_p)
if result == OpEN::OPEN_E_NONE
return acl_id
else
return 0
end
end
def create_rule(acl_id, rule_id, action)
#Create an action rule which is attached to the given acl_id
if acl_id == 0; return 0 end # exit if invalid id
if rule_id != 0; rule_id+=1 end # increment if not first rule
# Create new action rule
rule_p = OpEN.new_uint32_tp()
OpEN.uint32_tp_assign(rule_p, rule_id)
result = OpEN.openapiAclRuleActionAdd(@client, acl_id, rule_p, action)
print_bad_result(result, 'openapiAclRuleActionAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for demonstration
rule_id = OpEN.uint32_tp_value(rule_p)
action_p = OpEN.new_OPEN_ACL_ACTION_tp()
result = OpEN.openapiAclRuleActionGet(@client, acl_id, rule_id, action_p)
print_bad_result(result, 'openapiAclRuleActionGet')
print_sanity_results(result, action==OpEN::OPEN_ACL_ACTION_tp_value(action_p), 'create_rule', "")
OpEN.delete_OPEN_ACL_ACTION_tp(action_p)
end
OpEN.delete_uint32_tp(rule_p)
if result == OpEN::OPEN_E_NONE
return rule_id
else
return 0
end
end
def create_match(acl_id, rule_id, match, conditions)
#Add a match-condition to the supplied acl and rule
if acl_id == 0 or rule_id == 0; return 0 end
if match == MATCH_ASSIGN_QUEUE
if conditions[0]
# Use rate if supplied
val = conditions[0]
else
# Else retrieve maximum rate allowed for demonstration
val_p = OpEN.new_uint32_tp()
OpEN.openapiAclMaxAssignQueueGet(@client, val_p)
val = OpEN.uint32_tp_value(val_p)
OpEN.delete_uint32_tp(val_p)
end
result = OpEN.openapiAclRuleMatchAssignQueueAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchAssignQueueAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchAssignQueueGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchAssignQueueGet')
print_sanity_results(result, val==OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_COS
# MAC ACLs only
val = conditions[0]
result = OpEN.openapiAclRuleMatchMacCosAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchMacCosAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchMacCosGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchMacCosGet')
print_sanity_results(result, val==OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_COS2
# MAC ACLs only
val = conditions[0]
result = OpEN.openapiAclRuleMatchMacCos2Add(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchMacCos2Add')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchMacCos2Get(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchMacCos2Get')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_DSCP
val = conditions[0]
result = OpEN.openapiAclRuleMatchDscpAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchDscpAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchDscpGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchDscpGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_ETHER_TYPE
val = conditions[0]
result = OpEN.openapiAclRuleMatchMacEtherTypeAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchMacEtherTypeAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchMacEtherTypeGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchMacEtherTypeGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_FLOW_LABEL
# IPv6 ACLs only
val = conditions[0]
result = OpEN.openapiAclRuleMatchFlowLabelAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchFlowLabelAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchFlowLabelGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchFlowLabelGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_FRAGMENTS
val = conditions[0]
result = OpEN.openapiAclRuleMatchFragmentsAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchFragmentsAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_bool_tp()
result = OpEN.openapiAclRuleMatchFragmentsGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchFragmentsGet')
print_sanity_results(result, val == OpEN.bool_tp_value(val_p), 'create_match', match)
OpEN.delete_bool_tp(val_p)
end
elsif match == MATCH_EVERY
val = conditions[0]
result = OpEN.openapiAclRuleMatchEveryAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchEveryAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_bool_tp()
result = OpEN.openapiAclRuleMatchEveryGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchEveryGet')
print_sanity_results(result, val == OpEN.bool_tp_value(val_p), 'create_match', match)
OpEN.delete_bool_tp(val_p)
end
elsif match == MATCH_ICMP_TYPE
type = conditions[0]
code = conditions[1]
bool = conditions[2]
result = OpEN.openapiAclRuleMatchIcmpTypeCodeAdd(@client, acl_id, rule_id, bool, type, code)
print_bad_result(result, 'openapiAclRuleMatchIcmpTypeCodeAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
type_p = OpEN.new_uint32_tp()
code_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchIcmpTypeCodeGet(@client, acl_id, rule_id, bool, type_p, code_p)
print_bad_result(result, 'openapiAclRuleMatchIcmpTypeCodeGet')
tmp_type = OpEN.uint32_tp_value(type_p)
tmp_code = OpEN.uint32_tp_value(code_p)
print_sanity_results(result, (type == tmp_type and code == tmp_code), 'create_match', match)
OpEN.delete_uint32_tp(type_p)
OpEN.delete_uint32_tp(code_p)
end
elsif match == MATCH_IGMP_TYPE
type = conditions[0]
result = OpEN.openapiAclRuleMatchIgmpTypeAdd(@client, acl_id, rule_id, type)
print_bad_result(result, 'openapiAclRuleMatchIgmpTypeAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
type_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchIgmpTypeGet(@client, acl_id, rule_id, type_p)
print_bad_result(result, 'openapiAclRuleMatchIgmpTypeGet')
tmp_type = OpEN.uint32_tp_value(type_p)
print_sanity_results(result, type == tmp_type, 'create_match', match)
OpEN.delete_uint32_tp(type_p)
end
elsif match == MATCH_IP_DST_MASK
dst_addr = OpEN::Open_inet_addr_t.new
dst_addr.addr.ipv4 = IPAddr.new(conditions[0]).to_i
dst_addr.family = Socket::AF_INET
mask_addr = OpEN::Open_inet_addr_t.new
mask_addr.addr.ipv4 = IPAddr.new(conditions[1]).to_i
mask_addr.family = Socket::AF_INET
result = OpEN.openapiAclRuleMatchIpDstMaskAdd(@client, acl_id, rule_id, dst_addr, mask_addr)
print_bad_result(result, 'openapiAclRuleMatchIpDstMaskAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
tmp_dst_addr = OpEN::Open_inet_addr_t.new
tmp_dst_mask = OpEN::Open_inet_addr_t.new
result = OpEN.openapiAclRuleMatchIpDstMaskGet(@client, acl_id, rule_id, tmp_dst_addr, tmp_dst_mask)
print_bad_result(result, 'openapiAclRuleMatchIpDstMaskGet')
print_sanity_results(result, dst_addr.addr.ipv4 == tmp_dst_addr.addr.ipv4, 'create_match', match)
end
elsif match == MATCH_IP_SRC_MASK
src_addr = OpEN::Open_inet_addr_t.new
src_addr.addr.ipv4 = IPAddr.new(conditions[0]).to_i
src_addr.family = Socket::AF_INET
mask_addr = OpEN::Open_inet_addr_t.new
mask_addr.addr.ipv4 = IPAddr.new(conditions[1]).to_i
mask_addr.family = Socket::AF_INET
result = OpEN.openapiAclRuleMatchIpSrcMaskAdd(@client, acl_id, rule_id, src_addr, mask_addr)
print_bad_result(result, 'openapiAclRuleMatchIpSrcMaskAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
tmp_src_addr = OpEN::Open_inet_addr_t.new
tmp_src_mask = OpEN::Open_inet_addr_t.new
result = OpEN.openapiAclRuleMatchIpSrcMaskGet(@client, acl_id, rule_id, tmp_src_addr, tmp_src_mask)
print_bad_result(result, 'openapiAclRuleMatchIpSrcMaskGet')
print_sanity_results(result, src_addr.addr.ipv4 == tmp_src_addr.addr.ipv4, 'create_match', match)
end
elsif match == MATCH_IPV6_DST_PFX
begin
addr = OpEN::Open_inet_addr_t.new
addr.addr.ipv6 = IPAddr.new(conditions[0]).to_i
addr.family = Socket::AF_INET6
pfx = conditions[1]
result = OpEN.openapiAclRuleMatchIpv6DstPfxAdd(@client, acl_id, rule_id, addr, pfx)
print_bad_result(result, 'openapiAclRuleMatchIpv6DstPfxAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
tmp_addr = OpEN::Open_inet_addr_t.new
tmp_pfx_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchIpv6DstPfxGet(@client, acl_id, rule_id, tmp_addr, tmp_pfx_p)
print_bad_result(result, 'openapiAclRuleMatchIpv6DstPfxGet')
print_sanity_results(result, addr.addr.ipv6 == tmp_addr.addr.ipv6, 'create_match', match)
OpEN.delete_uint32_tp(tmp_pfx_p)
end
rescue TypeError
print_sanity_results(OpEN::OPEN_E_ERROR, false, 'create_match', match + " IPV6 IS MOST LIKELY DISABLED")
end
elsif match == MATCH_IPV6_SRC_PFX
begin
addr = OpEN::Open_inet_addr_t.new
addr.addr.ipv6 = IPAddr.new(conditions[0]).to_i
addr.family = Socket::AF_INET6
pfx = conditions[1]
result = OpEN.openapiAclRuleMatchIpv6SrcPfxAdd(self.m_client, acl_id, rule_id, addr, pfx)
print_bad_result(result, 'openapiAclRuleMatchIpv6SrcPfxAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
tmp_addr = OpEN::Open_inet_addr_t.new
tmp_pfx_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchIpv6SrcPfxGet(self.m_client, acl_id, rule_id, tmp_addr, tmp_pfx_p)
print_bad_result(result, 'openapiAclRuleMatchIpv6SrcPfxGet')
print_sanity_results(result, addr.addr.ipv6 == tmp_addr.addr.ipv6, 'create_match', match)
OpEN.delete_uint32_tp(tmp_pfx_p)
end
rescue TypeError
print_sanity_results(OpEN::OPEN_E_ERROR, false, 'create_match', match + " IPV6 IS MOST LIKELY DISABLED")
end
elsif match == MATCH_L4_DST_PORT
oper = conditions[0]
start = conditions[1]
end_ = conditions[2]
result = OpEN.openapiAclRuleMatchL4DstPortAdd(@client, acl_id, rule_id, oper, start, end_)
print_bad_result(result, 'openapiAclRuleMatchL4DstPortAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
oper_p = OpEN.new_OPEN_ACL_L4_PORT_OPERATOR_tp()
start_p = OpEN.new_uint32_tp()
end_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchL4DstPortGet(@client, acl_id, rule_id, oper_p, start_p, end_p)
print_bad_result(result, 'openapiAclRuleMatchL4DstPortGet')
print_sanity_results(result, (oper == OpEN::OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p) and start == OpEN.uint32_tp_value(start_p)), 'create_match', match)
OpEN.delete_OPEN_ACL_L4_PORT_OPERATOR_tp(oper_p)
OpEN.delete_uint32_tp(start_p)
OpEN.delete_uint32_tp(end_p)
end
elsif match == MATCH_L4_SRC_PORT
oper = conditions[0]
start = conditions[1]
end_ = conditions[2]
result = OpEN.openapiAclRuleMatchL4SrcPortAdd(@client, acl_id, rule_id, oper, start, end_)
print_bad_result(result, 'openapiAclRuleMatchL4SrcPortAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
oper_p = OpEN.new_OPEN_ACL_L4_PORT_OPERATOR_tp()
start_p = OpEN.new_uint32_tp()
end_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchL4SrcPortGet(@client, acl_id, rule_id, oper_p, start_p, end_p)
print_bad_result(result, 'openapiAclRuleMatchL4SrcPortGet')
print_sanity_results(result, (oper == OpEN::OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p) and start == OpEN.uint32_tp_value(start_p)), 'create_match', match)
OpEN.delete_OPEN_ACL_L4_PORT_OPERATOR_tp(oper_p)
OpEN.delete_uint32_tp(start_p)
OpEN.delete_uint32_tp(end_p)
end
elsif match == MATCH_LOGGING
val = conditions[0]
result = OpEN.openapiAclRuleMatchLoggingAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchLoggingAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchLoggingGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchLoggingGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_MAC_DST_MASK
# MAC ACLs only
mac = conditions[0]
mask = conditions[1]
mac_array = $open.getCharBuffer(mac.length, mac)
mac_buff = OpEN::Open_buffdesc.new
mac_buff.pstart = mac_array
mac_buff.size = mac.length+1
mask_array = $open.getCharBuffer(mask.length, mask)
mask_buff = OpEN::Open_buffdesc.new
mask_buff.pstart = mask_array
mask_buff.size = mask.length+1
result = OpEN.openapiAclRuleMatchMacDstMacAdd(@client, acl_id, rule_id, mac_buff, mask_buff)
print_bad_result(result, 'openapiAclRuleMatchMacDstMacAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
max_len = 18
tmp_mac_buff_string = $open.getCharBuffer(max_len)
tmpmac_buff = OpEN::Open_buffdesc.new
tmpmac_buff.pstart = tmp_mac_buff_string
tmpmac_buff.size = max_len
tmp_mask_buff_string = $open.getCharBuffer(max_len)
tmp_mask_buff = OpEN::Open_buffdesc.new
tmp_mask_buff.pstart = tmp_mask_buff_string
tmp_mask_buff.size = max_len
result = OpEN.openapiAclRuleMatchMacDstMacGet(@client, acl_id, rule_id, tmpmac_buff, tmp_mask_buff)
print_bad_result(result, 'openapiAclRuleMatchMacDstMacGet')
tmp_mac = tmp_mac_buff_string.cast()
tmp_mask = tmp_mask_buff_string.cast()
print_sanity_results(result, (mac == tmp_mac and mask == tmp_mask), 'create_match', match)
end
elsif match == MATCH_MAC_SRC_MASK
# MAC ACLs only
mac = conditions[0]
mask = conditions[1]
mac_array = $open.getCharBuffer(mac.length, mac)
mac_buff = OpEN::Open_buffdesc.new
mac_buff.pstart = mac_array
mac_buff.size = mac.length+1
mask_array = $open.getCharBuffer(mask.length, mask)
mask_buff = OpEN::Open_buffdesc.new
mask_buff.pstart = mask_array
mask_buff.size = mask.length+1
result = OpEN.openapiAclRuleMatchMacSrcMacAdd(@client, acl_id, rule_id, mac_buff, mask_buff)
print_bad_result(result, 'openapiAclRuleMatchMacSrcMacAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
max_len = 18
tmp_mac_buff_string = $open.getCharBuffer(max_len)
tmpmac_buff = OpEN::Open_buffdesc.new
tmpmac_buff.pstart = tmp_mac_buff_string
tmpmac_buff.size = max_len
tmp_mask_buff_string = $open.getCharBuffer(max_len)
tmp_mask_buff = OpEN::Open_buffdesc.new
tmp_mask_buff.pstart = tmp_mask_buff_string
tmp_mask_buff.size = max_len
result = OpEN.openapiAclRuleMatchMacSrcMacGet(@client, acl_id, rule_id, tmpmac_buff, tmp_mask_buff)
print_bad_result(result, 'openapiAclRuleMatchMacSrcMacGet')
tmp_mac = tmp_mac_buff_string.cast()
tmp_mask = tmp_mask_buff_string.cast()
print_sanity_results(result, (mac == tmp_mac and mask == tmp_mask), 'create_match', match)
end
elsif match == MATCH_PACKET_MIRRORING
if conditions[0]
# Use interface if supplied
val = conditions[0]
else
# Else retrieve first available physical interface for demonstration
val_p = OpEN.new_uint32_tp()
OpEN.openapiIfFirstGet(@client, OpEN::OPEN_INTF_TYPE_PHY, val_p)
val = OpEN.uint32_tp_value(val_p)
OpEN.delete_uint32_tp(val_p)
end
result = OpEN.openapiAclRuleMatchMirrorAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchMirrorAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchMirrorGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchMirrorGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_PACKET_REDIRECTION
if conditions[0]
# Use interface if supplied
val = conditions[0]
else
# Else retrieve first available physical interface for demonstration
val_p = OpEN.new_uint32_tp()
OpEN.openapiIfFirstGet(@client, OpEN::OPEN_INTF_TYPE_PHY, val_p)
val = OpEN.uint32_tp_value(val_p)
end
result = OpEN.openapiAclRuleMatchRedirectAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchRedirectAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchRedirectGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchRedirectGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_PRECEDENCE
val = conditions[0]
result = OpEN.openapiAclRuleMatchPrecedenceAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchPrecedenceAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchPrecedenceGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchPrecedenceGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_PROTOCOL
val = conditions[0]
result = OpEN.openapiAclRuleMatchProtocolAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchProtocolAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchProtocolGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchProtocolGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_RATE_LIMIT
val1 = conditions[0]
val2 = conditions[1]
result = OpEN.openapiAclRuleMatchRateLimitAdd(@client, acl_id, rule_id, val1, val2)
print_bad_result(result, 'openapiAclRuleMatchRateLimitAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
val1_p = OpEN.new_uint32_tp()
val2_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchRateLimitGet(@client, acl_id, rule_id, val1_p, val2_p)
print_bad_result(result, 'openapiAclRuleMatchRateLimitGet')
print_sanity_results(result, (val1 == OpEN.uint32_tp_value(val1_p) and val2 == OpEN.uint32_tp_value(val2_p)), 'create_match', match)
OpEN.delete_uint32_tp(val1_p)
OpEN.delete_uint32_tp(val2_p)
end
elsif match == MATCH_REDIRECT_AGENT
val = conditions[0]
result = OpEN.openapiAclRuleMatchRedirectAgentAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchRedirectAgentAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchRedirectAgentGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchRedirectAgentGet')
print_sanity_results(result, val == OpEN.uint32_tp_value(val_p), 'create_match', match)
OpEN.delete_uint32_tp(val_p)
end
elsif match == MATCH_ROUTING_FLAG
# IPv6 ACLs only
val = conditions[0]
result = OpEN.openapiAclRuleMatchRoutingAdd(@client, acl_id, rule_id, val)
print_bad_result(result, 'openapiAclRuleMatchRoutingAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
val_p = OpEN.new_bool_tp()
result = OpEN.openapiAclRuleMatchRoutingGet(@client, acl_id, rule_id, val_p)
print_bad_result(result, 'openapiAclRuleMatchRoutingGet')
print_sanity_results(result, val == OpEN.bool_tp_value(val_p), 'create_match', match)
OpEN.delete_bool_tp(val_p)
end
elsif match == MATCH_TCP_FLAGS
flag_val = 0
flag_mask = 0
for idx in 0..conditions.length-1
flag_val |= (1 << conditions[idx])
flag_mask |= (1 << conditions[idx])
end
result = OpEN.openapiAclRuleMatchTcpFlagsAdd(@client, acl_id, rule_id, flag_val, flag_mask)
print_bad_result(result, 'openapiAclRuleMatchTcpFlagsAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
flag_val_p = OpEN.new_uint32_tp()
flag_mask_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchTcpFlagsGet(@client, acl_id, rule_id, flag_val_p, flag_mask_p)
print_bad_result(result, 'openapiAclRuleMatchTcpFlagsGet')
print_sanity_results(result, flag_val == OpEN.uint32_tp_value(flag_val_p), 'create_match', match)
OpEN.delete_uint32_tp(flag_val_p)
OpEN.delete_uint32_tp(flag_mask_p)
end
elsif match == MATCH_TIME_BASED
time_name = conditions[0]
name_array = $open.getCharBuffer(time_name.length, time_name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_array
name_buff.size = time_name.length+1
result = OpEN.openapiAclRuleMatchTimeRangeAdd(@client, acl_id, rule_id, name_buff)
print_bad_result(result, 'openapiAclRuleMatchTimeRangeAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
max_len = 32
tmp_name_string = $open.getCharBuffer(max_len)
tmp_name_buff = OpEN::Open_buffdesc.new
tmp_name_buff.pstart = tmp_name_string
tmp_name_buff.size = max_len
result = OpEN.openapiAclRuleMatchTimeRangeGet(@client, acl_id, rule_id, tmp_name_buff)
print_bad_result(result, 'openapiAclRuleMatchTimeRangeGet')
tmp_time_name = tmp_name_string.cast()
print_sanity_results(result, time_name == tmp_time_name, 'create_match', match)
end
if result == OpEN::OPEN_E_NONE
status_p = OpEN.new_OPEN_ACL_RULE_STATUS_tp()
result = OpEN.openapiAclRuleMatchTimeRangeStatusGet(@client, acl_id, rule_id, status_p)
print_bad_result(result, 'openapiAclRuleMatchTimeRangeStatusGet')
print_sanity_results(result, true, 'create_match', match+' status')
OpEN.delete_OPEN_ACL_RULE_STATUS_tp(status_p)
end
elsif match == MATCH_TOS
tos_val = conditions[0]
tos_mask = conditions[1]
tos_val_array = $open.getCharBuffer(tos_val.length, tos_val)
tos_val_buff = OpEN::Open_buffdesc.new
tos_val_buff.pstart = tos_val_array
tos_val_buff.size = tos_val.length+1
tos_mask_array = $open.getCharBuffer(tos_mask.length, tos_mask)
tos_mask_buff = OpEN::Open_buffdesc.new
tos_mask_buff.pstart = tos_mask_array
tos_mask_buff.size = tos_mask.length+1
result = OpEN.openapiAclRuleMatchTosAdd(@client, acl_id, rule_id, tos_val_buff, tos_mask_buff)
print_bad_result(result, 'openapiAclRuleMatchTosAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created rule for comparison
max_len = 32
tmp_tos_buffstring = $open.getCharBuffer(max_len)
tmp_tos_buff = OpEN::Open_buffdesc.new
tmp_tos_buff.pstart = tmp_tos_buffstring
tmp_tos_buff.size = max_len
tmp_mask_buff_string = $open.getCharBuffer(max_len)
tmp_mask_buff = OpEN::Open_buffdesc.new
tmp_mask_buff.pstart = tmp_mask_buff_string
tmp_mask_buff.size = max_len
result = OpEN.openapiAclRuleMatchTosGet(@client, acl_id, rule_id, tmp_tos_buff, tmp_mask_buff)
print_bad_result(result, 'openapiAclRuleMatchTosGet')
tmp_tos_val = tmp_tos_buffstring.cast()
tmp_tos_mask = tmp_mask_buff_string.cast()
print_sanity_results(result, (tos_val == tmp_tos_val and tos_mask == tmp_tos_mask), 'create_match', match)
end
elsif match == MATCH_VLAN
# MAC ACLs only
oper = conditions[0]
start = conditions[1]
end_ = conditions[2]
result = OpEN.openapiAclRuleMatchMacVlanAdd(@client, acl_id, rule_id, oper, start, end_)
print_bad_result(result, 'openapiAclRuleMatchMacVlanAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
oper_p = OpEN.new_OPEN_ACL_VLAN_OPERATOR_tp()
start_p = OpEN.new_uint32_tp()
end_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchMacVlanGet(@client, acl_id, rule_id, oper_p, start_p, end_p)
print_bad_result(result, 'openapiAclRuleMatchMacVlanGet')
print_sanity_results(result, (oper == OpEN::OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p) and start == OpEN.uint32_tp_value(start_p)), 'create_match', match)
OpEN.delete_OPEN_ACL_VLAN_OPERATOR_tp(oper_p)
OpEN.delete_uint32_tp(start_p)
OpEN.delete_uint32_tp(end_p)
end
elsif match == MATCH_VLAN2
# MAC ACLs only
oper = conditions[0]
start = conditions[1]
end_ = conditions[2]
result = OpEN.openapiAclRuleMatchMacSecondaryVlanAdd(@client, acl_id, rule_id, oper, start, end_)
print_bad_result(result, 'openapiAclRuleMatchMacSecondaryVlanAdd')
if result == OpEN::OPEN_E_NONE
# Fetch newly created match for comparison
oper_p = OpEN.new_OPEN_ACL_VLAN_OPERATOR_tp()
start_p = OpEN.new_uint32_tp()
end_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclRuleMatchMacSecondaryVlanGet(@client, acl_id, rule_id, oper_p, start_p, end_p)
print_bad_result(result, 'openapiAclRuleMatchMacSecondaryVlanGet')
print_sanity_results(result, (oper == OpEN::OPEN_ACL_L4_PORT_OPERATOR_tp_value(oper_p) and start == OpEN.uint32_tp_value(start_p)), 'create_match', match)
OpEN.delete_OPEN_ACL_VLAN_OPERATOR_tp(oper_p)
OpEN.delete_uint32_tp(start_p)
OpEN.delete_uint32_tp(end_p)
end
end
end
def test_acl_creation(acls)
#Parse through the acls list and call acl functions based on keys
acl_id = 0
rule_id = 0
key = ''
acls.each do |acl|
acl.collect! do |val|
if val.class == String
key = val
elsif val.class == Array
case key
when ACL; acl_id = create_acl(val[0], val[1], true)
when RULE; rule_id = create_rule(acl_id, rule_id, val[0])
else create_match(acl_id, rule_id, key, val)
end
end
end
end
end
def test_acl_retrieval()
#Perform some generic ACL retrieval and iteration tests.
acl_p = OpEN.new_uint32_tp()
acl_type = OpEN::OPEN_ACL_TYPE_IP
# Get first named IP ACL created in the system
result = OpEN.openapiAclGetFirst(@client, acl_type, acl_p)
print_bad_result(result, "openapiAclGetFirst - IP ACL")
print_sanity_results(result, (OpEN.uint32_tp_value(acl_p)>0), "openapiAclGetFirst", "IP ACL")
# Create temporary ACL for iteration purposes
tmp_acl = self.create_acl("another_test_ip", acl_type, true)
# Start from the beginning and iterate through the available named ACL type
if (tmp_acl>0)
tmp_acl = 0
while OpEN.openapiAclGetNext(@client, acl_type, tmp_acl, acl_p) == OpEN::OPEN_E_NONE
tmp_acl = OpEN.uint32_tp_value(acl_p)
end
print_sanity_results(OpEN::OPEN_E_NONE, (tmp_acl>0), "openapiAclGetNext", "temporary")
end
# Remove temporary ACL
if (tmp_acl>0)
result = OpEN.openapiAclDelete(@client, tmp_acl)
print_sanity_results(result, true, "openapiAclDelete", "temporary")
end
OpEN.delete_uint32_tp(acl_p)
end
def test_acl_rename(acls)
#Rename an ACL and verify.
acls.each do |val|
acl_type = val[0]
name = val[1]
new_name = val[2]
acl_p = OpEN.new_uint32_tp()
name_string = $open.getCharBuffer(name.length, name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_string
name_buff.size = name.length+1
new_name_string = $open.getCharBuffer(new_name.length, new_name)
newname_buff = OpEN::Open_buffdesc.new
newname_buff.pstart = new_name_string
newname_buff.size = new_name.length+1
result = OpEN.openapiAclGet(@client, acl_type, name_buff, acl_p)
tmp_acl = OpEN.uint32_tp_value(acl_p)
print_bad_result(result, "openapiAclGet - IP ACL")
if (result == OpEN::OPEN_E_NONE)
result = OpEN.openapiAclNameGet(@client, tmp_acl, name_buff)
print_bad_result(result, "openapiAclNameGet - IP ACL")
end
if (result == OpEN::OPEN_E_NONE)
result = OpEN.openapiAclRename(@client, tmp_acl, newname_buff)
print_bad_result(result, "openapiAclRename - IP ACL")
print_sanity_results(result, (tmp_acl>0), "openapiAclRename", new_name_string.cast())
end
OpEN.delete_uint32_tp(acl_p)
end
end
def test_acl_traffic_filter_create(interfaces)
#Parse through the interfaces list and call traffic filter functions based on keys
key = ''
interfaces.each do |interface|
interface.collect! do |val|
if val.class == String
key = val
elsif val.class == Array
acl_p = OpEN.new_uint32_tp()
type = val[0]
name = val[1]
intf = val[2]
dir = val[3]
seq = val[4]
name_string = $open.getCharBuffer(name.length, name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_string
name_buff.size = name.length+1
result = OpEN.openapiAclGet(@client, type, name_buff, acl_p)
print_bad_result(result, "openapiAclGet - IP ACL")
if result == OpEN::OPEN_E_NONE
acl = OpEN.uint32_tp_value(acl_p)
if key == INTF
result = OpEN.openapiAclIntfDirAdd(@client, intf, dir, acl, seq)
disp = "ACL:%s INTF:%d dir:%d seq:%d" % [name_string.cast(), intf, dir, seq]
print_sanity_results(result, true, "openapiAclIntfDirAdd", disp)
elsif key == VLAN
result = OpEN.openapiAclVlanDirAdd(@client, intf, dir, acl, seq)
disp = "ACL:%s VLAN:%d dir:%d seq:%d" % [name_string.cast(), intf, dir, seq]
print_sanity_results(result, true, "openapiAclVlanDirAdd", disp)
end
end
OpEN.delete_uint32_tp(acl_p)
end
end
end
end
def test_acl_traffic_filter_delete(interfaces)
#Parse through the interfaces list and call traffic filter functions based on keys
key = ''
interfaces.each do |interface|
interface.collect! do |val|
if val.class == String
key = val
elsif val.class == Array
acl_p = OpEN.new_uint32_tp()
type = val[0]
name = val[1]
intf = val[2]
dir = val[3]
name_string = $open.getCharBuffer(name.length, name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_string
name_buff.size = name.length+1
result = OpEN.openapiAclGet(@client, type, name_buff, acl_p)
print_bad_result(result, "openapiAclGet - IP ACL")
if result == OpEN::OPEN_E_NONE
acl = OpEN.uint32_tp_value(acl_p)
if key == INTF
result = OpEN.openapiAclIntfDirDelete(@client, intf, dir, acl)
disp = "ACL:%s INTF:%d dir:%d" % [name_string.cast(), intf, dir]
print_sanity_results(result, true, "openapiAclIntfDirDelete", disp)
elsif key == VLAN
result = OpEN.openapiAclVlanDirDelete(@client, intf, dir, acl)
disp = "ACL:%s VLAN:%d dir:%d" % [name_string.cast(), intf, dir]
print_sanity_results(result, true, "openapiAclVlanDirDelete", disp)
end
end
OpEN.delete_uint32_tp(acl_p)
end
end
end
end
def test_acl_route_filter(acl_type, name)
#Apply an ACL as a route filter.
name_string = $open.getCharBuffer(name.length, name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_string
name_buff.size = name.length+1
acl_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclGet(@client, acl_type, name_buff, acl_p)
print_bad_result(result, "openapiAclGet - IP ACL")
if result == OpEN::OPEN_E_NONE
acl = OpEN.uint32_tp_value(acl_p)
action_p = OpEN.new_OPEN_ACL_ACTION_tp()
action_p = OpEN.copy_OPEN_ACL_ACTION_tp(OpEN::OPEN_ACL_DENY)
routePrefix = IPAddr.new('10.10.10.1').to_i
routeMask = IPAddr.new('255.0.0.0').to_i
result = OpEN.openapiAclRouteFilter(@client, acl, routePrefix, routeMask, action_p);
print_sanity_results(result, OpEN::OPEN_ACL_PERMIT == OpEN::OPEN_ACL_ACTION_tp_value(action_p), "openapiAclRouteFilter", "permit")
end
OpEN.delete_uint32_tp(acl_p)
OpEN.delete_OPEN_ACL_ACTION_tp(action_p)
end
def show_interface_summary(dir)
#Iterate and display interfaces that are inuse with matching direction
puts "\nACL interface direction assignments"
puts "Interface Direction"
puts "--------- ---------"
intf = 0
intf_p = OpEN.new_uint32_tp()
dir_p = OpEN.new_OPEN_ACL_DIRECTION_tp()
inuse_p = OpEN.new_OPEN_CONTROL_tp()
while OpEN.openapiAclIntfDirGetNext(@client, intf, dir, intf_p, dir_p) == OpEN::OPEN_E_NONE
# Capture all the interfaces with ACL assignments
if OpEN.openapiAclIsIntfInUse(@client, intf, dir, inuse_p) == OpEN::OPEN_E_NONE
if OpEN.OPEN_CONTROL_tp_value(inuse_p) == OpEN::OPEN_ENABLE
puts "%9d %s" % [intf, (dir==OpEN::OPEN_ACL_INBOUND_ACL)?"Inbound":"Outbound"]
end
end
dir = OpEN.OPEN_ACL_DIRECTION_tp_value(dir_p)
intf = OpEN.uint32_tp_value(intf_p)
end
OpEN.delete_uint32_tp(intf_p)
OpEN.delete_OPEN_ACL_DIRECTION_tp(dir_p)
OpEN.delete_OPEN_CONTROL_tp(inuse_p)
end
def show_interface_details(type, intf, dir)
#Retrieve a list of ACLs for the given interface and direction and display its contents
if type == VLAN
list_info = OpEN::OPEN_ACL_VLAN_DIR_LIST_t.new
result = OpEN.openapiAclVlanDirListGet(@client, intf, dir, list_info)
print_bad_result(result, "openapiAclVlanDirListGet")
puts "\nACL VLAN %d, %s details" % [intf, (dir==OpEN::OPEN_ACL_INBOUND_ACL)?"Inbound":"Outbound"]
puts "ACL id Type Seq Num"
puts "------ ---- -------"
for idx in 0..list_info.count-1
info = OpEN.OPEN_ACL_VLAN_LIST_INFO_tArray_getitem(list_info.listEntry, idx)
puts "%6d %4d %7d" % [info.aclId, info.aclType, info.seqNum]
end
else
list_info = OpEN::OPEN_ACL_INTF_DIR_LIST_t.new
result = OpEN.openapiAclIntfDirListGet(@client, intf, dir, list_info)
print_bad_result(result, "openapiAclIntfDirListGet")
puts "\nACL Interface %d, %s details" % [intf, (dir==OpEN::OPEN_ACL_INBOUND_ACL)?"Inbound":"Outbound"]
puts "ACL id Type Seq Num"
puts "------ ---- -------"
for idx in 0..list_info.count-1
info = OpEN.OPEN_ACL_INTF_LIST_INFO_tArray_getitem(list_info.listEntry, idx)
puts "%6d %4d %7d" % [info.aclId, info.aclType, info.seqNum]
end
end
end
def show_acl_details(interface_type, acl_type, name, dir)
#Retrieve a list of ACLs for the given acl name and direction and display its contents
name_string = $open.getCharBuffer(name.length, name)
name_buff = OpEN::Open_buffdesc.new
name_buff.pstart = name_string
name_buff.size = name.length+1
acl_p = OpEN.new_uint32_tp()
result = OpEN.openapiAclGet(@client, acl_type, name_buff, acl_p)
acl = OpEN.uint32_tp_value(acl_p)
print_bad_result(result, "openapiAclGet - IP ACL")
if interface_type == VLAN
list_info = OpEN::OPEN_ACL_ASSIGNED_VLAN_LIST_t.new
result = OpEN.openapiAclAssignedVlanDirListGet(@client, acl, dir, list_info)
print_bad_result(result, "openapiAclAssignedVlanDirListGet")
puts "\nACL %s, %s VLANs" % [name_string.cast(), (dir==OpEN::OPEN_ACL_INBOUND_ACL)?"Inbound":"Outbound"]
if result == OpEN::OPEN_E_NONE
puts "Interface"
puts "---------"
for idx in 0..list_info.count-1
puts OpEN.uint32_tArray_getitem(list_info.vlan, idx)
end
end
else
list_info = OpEN::OPEN_ACL_ASSIGNED_INTF_LIST_t.new
result = OpEN.openapiAclAssignedIntfDirListGet(@client, acl, dir, list_info)
print_bad_result(result, "openapiAclAssignedIntfDirListGet")
puts "\nACL %s, %s Interfaces" % [name_string.cast(), (dir==OpEN::OPEN_ACL_INBOUND_ACL)?"Inbound":"Outbound"]
if result == OpEN::OPEN_E_NONE
puts "Interface"
puts "---------"
for idx in 0..list_info.count-1
puts OpEN.uint32_tArray_getitem(list_info.intf, idx)
end
end
end
OpEN.delete_uint32_tp(acl_p)
end
def show_summary()
#Retrieve and display miscellaneous info
val_p = OpEN.new_uint32_tp()
puts
result = OpEN.openapiCpuIntfGet(@client, val_p)
print_bad_result(result, "openapiCpuIntfGet")
string = "CPU Control Plane Interface ... %d" % OpEN.uint32_tp_value(val_p)
print_sanity_results(result, true, "openapiCpuIntfGet", string)
result = OpEN.openapiAclMaxAclIntfCountGet(@client, val_p)
print_bad_result(result, "openapiAclMaxAclIntfCountGet")
string = "Number of allowed ACL Interfaces ... %d" % OpEN.uint32_tp_value(val_p)
print_sanity_results(result, true, "openapiAclMaxAclIntfCountGet", string)
result = OpEN.openapiAclMaxAclVlanCountGet(@client, val_p)
print_bad_result(result, "openapiAclMaxAclVlanCountGet")
string = "Number of allowed ACL VLANs ... %d" % OpEN.uint32_tp_value(val_p)
print_sanity_results(result, true, "openapiAclMaxAclVlanCountGet", string)
result = OpEN.openapiAclCountGet(@client, val_p)
print_bad_result(result, "openapiAclCountGet")
string = "Total number of configured ACLs ... %d" % OpEN.uint32_tp_value(val_p)
print_sanity_results(result, true, "openapiAclCountGet", string)
result = OpEN.openapiAclMacCountGet(@client, val_p)
print_bad_result(result, "openapiAclMacCountGet")
string = "Number of configured MAC ACLs ... %d" % OpEN.uint32_tp_value(val_p)
print_sanity_results(result, true, "openapiAclMacCountGet", string)
OpEN.delete_uint32_tp(val_p)
end
end
def main()
# Demonstrate OpEN usage for ACL APIs
ret = $open.connect("acl_example")
if ret == OpEN::OPEN_E_NONE
$open.getNetworkOSVersion()
$open.getAPIVersion()
client = $open.client
example = AclExample.new(client)
example.test_acl_creation(ACL_LIST)
example.test_acl_retrieval()
example.test_acl_rename(ACL_RENAME_LIST)
example.test_acl_traffic_filter_create(INTERFACE_CREATE_LIST)
example.test_acl_traffic_filter_delete(INTERFACE_DELETE_LIST)
example.test_acl_route_filter(OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed')
example.show_interface_summary(OpEN::OPEN_ACL_INBOUND_ACL)
example.show_interface_details(INTF, 1, OpEN::OPEN_ACL_INBOUND_ACL)
example.show_interface_details(VLAN, 6, OpEN::OPEN_ACL_INBOUND_ACL)
example.show_acl_details(INTF, OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', OpEN::OPEN_ACL_INBOUND_ACL)
example.show_acl_details(VLAN, OpEN::OPEN_ACL_TYPE_IP, 'test_ip_renamed', OpEN::OPEN_ACL_INBOUND_ACL)
example.show_summary()
$open.terminate()
else
print "Unable to connect"
end
end
if __FILE__ == $0 then main() end