Changing asyncio library to use Throttler by default?

PhilipDAth
Kind of a big deal
Kind of a big deal

Changing asyncio library to use Throttler by default?

Every single asyncio script I write, I have to override the semaphore to use the Thottler class, otherwise I constantly get 429 errors.  My use case is that I am processing many hundreds of async changes (could easily be 500+).

 

I currently use:

import throttler

	async with meraki.aio.AsyncDashboardAPI(
		output_log=False,
		print_console=False
	) as dashboard:
		dashboard._session._concurrent_requests_semaphore = throttler.Throttler(rate_limit=10, period=1.0)
		await my_super_code(dashboard)

 

What does everyone think about changing the actual asyncio Meraki SDK to default to the Throttler rather than using the current semaphore approach?

11 Replies 11
sungod
Kind of a big deal
Kind of a big deal

Yep, it'd get my vote.

 

The built-in mechanism breaks down when volume increases, over the years I've tailored scripts to reduce rate various ways depending on what they were doing.

 

But what I'm seeing with some customers is use of apps that simply ignore rate limiting and just pound on the API service - for example, regular storms of 50,000+ calls with fewer than 500 succeeding, the rest are 429s. This has a knock-on impact on all the well-behaved apps.

 

IMO it'd also be good for the API service to detect apps doing this and simply block them for a period (perhaps increasing exponentially), rather than allowing them to exhaust the call budget and block out other apps.

rhbirkelund
Kind of a big deal
Kind of a big deal

Would using Action Batches not take care of some of the calls?

LinkedIn ::: https://blog.rhbirkelund.dk/

Like what you see? - Give a Kudo ## Did it answer your question? - Mark it as a Solution 🙂

All code examples are provided as is. Responsibility for Code execution lies solely your own.
PhilipDAth
Kind of a big deal
Kind of a big deal

The last job I had to do was for a customer who wanted to update an SSID across 1,280 networks.  Some networks used MRs.  Some used an MX with a WiFi module.

 

No templates were in use.  The SSIDs had been manually set up over a period of years and were not even in the same SSID slot in every network.

 

They wanted to update the networks in batches based on tags.  I had to scan every network matching the tag for the SSID and update the existing settings.

 

There were a lot of API calls used just to locate the SSID to be updated.  I could have considered using Action Batches for the last bit, the actual update.  I find using Action Batches a real pain.

rhbirkelund
Kind of a big deal
Kind of a big deal

I won’t say that her action batches are intuitive. It took me an awhile to get it working, but once I did it was actually okay. Depending on the call you can do either 20 or 100 at a time, so splitting into bins was necessary. I imagine you should be able to do a list with these bins and do callbacks to monitor when they are executed, and updating the queue which is just a list.

LinkedIn ::: https://blog.rhbirkelund.dk/

Like what you see? - Give a Kudo ## Did it answer your question? - Mark it as a Solution 🙂

All code examples are provided as is. Responsibility for Code execution lies solely your own.
rhbirkelund
Kind of a big deal
Kind of a big deal

This is one of the scripts I've made that used action batches. Not that much advanced, but it worked when having to create a VLAN across multiple networks. 

#! /usr/bin/env python3

from datetime import datetime
import configparser
import meraki
import os
import time
import requests

SIMULATE_API = True

#### Settings file
SETTINGSFILE = "properties.ini"

def LoadSettingsFile(p_filename: str):
    returnResults = []
    settings = configparser.ConfigParser()
    print("## [ LoadSettings ] Reading settings from file:",p_filename)
    SettingsFiles = settings.read(p_filename)
    if len(SettingsFiles) > 0 and SETTINGSFILE in SettingsFiles:
        print("## [ LoadSettings ] Done.")
        return settings
    else:
        print("## [ LoadSettings ] Failed.")
        raise SystemError("Settings not loaded")

def GetOrganizationId() -> str:
    print("## [ GetOrganizationId ] Setting Organization Id")
    p_OrganizationId = "..."
    print("## [ GetOrganizationId ] Done.")
    return p_OrganizationId

def GetNetworks(p_DashbordInstance: object, p_OrganizationId: str):
    print("## [ GetNetworks ] Getting list of Networks")
    # Get all the networks for the organizations
    l_AllNetworks = p_DashbordInstance.organizations.getOrganizationNetworks(
        p_OrganizationId, total_pages='all'
    )
    print("## [ GetNetworks ] Done.")
    return l_AllNetworks

def FilterNetworks(p_AllNetworks,p_TargetTags: list[str], p_IgnoredNetworks: list[str], p_Settings):
    # Initialse the list of networks
    l_NetworkIds = []
    l_NetworkNames = []

    # Run though all the networks and test if its name is contained within a list of Ignored networks
    print("## [ Checking Names and Tags ] Filtering out networks that are to be ignored")
    for network in p_AllNetworks:
        # Test if the name is in the list of ignored networks
        if network['name'] in p_IgnoredNetworks:
            print("## [ Checking Names and Tags ] Name in ignorelist, skipped;",network['name'])
        elif not any([x in p_TargetTags for x in network['tags']]):
            print("## [ Checking Names and Tags ] Doesn't match any tags, skipped;",network['name'])
        else:
            # If name is not in the list, the network ID is appended to networks of interest
            print("## [ Checking Names and Tags ] Network not filtered, adding to list:",network['name'])
            l_NetworkIds.append(network['id'])
            l_NetworkNames.append(network['name'])
    print("## [ Checking Names and Tags ] Done.")
    return l_NetworkIds

def CheckIfNetworkUsesVlans(p_Dashboard,p_NetworkIds):
    l_NetworkIds = []
    for network_id in p_NetworkIds:
        print("## [ Checking Vlans Enabled ] checking network",network_id,"...")
        response = p_Dashboard.appliance.getNetworkApplianceVlansSettings(
            network_id
        )
        if response['vlansEnabled'] == True:
            print("## [ Checking Vlans Enabled ] Vlans enabled for network",network_id)
            l_NetworkIds.append(network_id)
    return l_NetworkIds


def main():
    dashboard = meraki.DashboardAPI(
        simulate=SIMULATE_API,
        suppress_logging=True,
    )

    Settings = LoadSettingsFile(SETTINGSFILE)
    #Settings = Settings['MX Settings']
            
    org_id = GetOrganizationId()

    IgnoredNetworks = []
    TargetTags = ['VLAN']


    AllNetworks = GetNetworks(dashboard,org_id)
    FilteredNetworks = FilterNetworks(AllNetworks,TargetTags,IgnoredNetworks,Settings)
    VlanEnabledNetworks = CheckIfNetworkUsesVlans(dashboard,FilteredNetworks)
    
    ##### Check if filtered networks already contain the VLAN to be used. If so, those networks are passed.

    NetworksWithVLanInUse = []
    TargetNetworks = []
    print("## [ xxx1 ] Checking if networks already use Vlan ID",Settings['appliance']['vlanid'])
    for network_id in VlanEnabledNetworks:
        print("## [ xxx1 ] Getting vlans from network:", network_id)
        vlans = dashboard.appliance.getNetworkApplianceVlans(
            network_id
        )
        if any([str(vlans[x]['id']) == Settings['appliance']['vlanid'] for x in range(len(vlans))]):
            print("## [ xxx1 ] Vlan already in use, skipping!")
            NetworksWithVLanInUse.append(network_id)
        else:
            print("## [ xxx1 ] Vlan not found, setting vlan details for network and marking as target",network_id)
            applianceIp = "10."+vlans[0]['applianceIp'].split('.')[1]+".255.1"
            subnet = "10."+vlans[0]['applianceIp'].split('.')[1]+".255.0/24"
            TargetNetworks.append({
                "network_id": network_id,
                "applianceIp":applianceIp,
                "subnet":subnet,
                # "applianceIp":vlans[0]['applianceIp'],
                # "subnet":vlans[0]['subnet'],
                })
    print("## [ xxx1 ] Done.")

    
    ##### List VPN enabled subnets
    TempData = []
    for network in TargetNetworks:
        print("## [ xxx2 ] Getting list of VPN enabled subnets for network", network['network_id'])
        SiteToSiteVpn = dashboard.appliance.getNetworkApplianceVpnSiteToSiteVpn(
            network['network_id']
        )
        if SiteToSiteVpn['mode'] == 'none':
            print("## [ xxx2 ] Network not in AutoVPN; skipping..")
            continue
        if Settings["appliance"]['invpn'] == "True":
            print("## [ xxx2 ] Adding new subnet to list of VPN enabled subnets.")
            SiteToSiteVpn['subnets'].append({
                'localSubnet': network['subnet'],
                'useVpn': True
            })
            network.update(SiteToSiteVpn)
            TempData.append(network)
    print("## [ xxx2 ] Done.")
    TargetNetworks = TempData

    ##### Build actions list of actions for each network
    actions = []
    for network in TargetNetworks:
        action = {
                "resource": f"/networks/{network['network_id']}/appliance/vlans",
                "operation": "create",
                "body": {
                    "id": Settings['appliance']['vlanid'],
                    "name": Settings['appliance']['vlanname'],
                    "applianceIp": network['applianceIp'],
                    "subnet": network['subnet'],
                    "groupPolicyId": ""
                    }
                }
        actions.append(action)
        action = {
                "resource": f"/networks/{network['network_id']}/appliance/vpn/siteToSiteVpn",
                "operation": "update",
                "body": {
                    "mode": network['mode'],
                    "hubs": network['hubs'],
                    "subnets": network['subnets']
                    },
                }
        # if network['mode'] == "hub":
        #     print("hold")
        actions.append(action)

    ##### ACTION BATCHES
    next = False
    chunk_size = 2
    actions_chunks = [actions[n:n+chunk_size] for n in range(0,len(actions),chunk_size)]

    url = f"https://api.meraki.com/api/v1/organizations/{org_id}/actionBatches"

    headers = {
        "Authorization": f"Bearer {os.environ['MERAKI_DASHBOARD_API_KEY']}",
        "Content-Type": "application/json",
    }
    for chunk in actions_chunks:
        payload = {
            "confirmed": True,
            "synchronous": True,
            "actions": chunk
        }
        #print(payload)
        response = requests.post(url, json=payload, headers=headers)
        print(response.text)
        print("#####")
        print("#####")
        time.sleep(2)
            
    
if __name__ == "__main__":
    StartTime = datetime.now()
    main()
    print("\nTime to execute:",datetime.now()-StartTime)
    
LinkedIn ::: https://blog.rhbirkelund.dk/

Like what you see? - Give a Kudo ## Did it answer your question? - Mark it as a Solution 🙂

All code examples are provided as is. Responsibility for Code execution lies solely your own.
PhilipDAth
Kind of a big deal
Kind of a big deal

I have saved that example, thank you!

PhilipDAth
Kind of a big deal
Kind of a big deal

@Oren , is this something your team could do?  Failing that, would you consider a pull request?

Oren
Meraki Employee All-Star Meraki Employee All-Star
Meraki Employee All-Star

Yep. Can you kindly provide an example of a script using aio what makes more than 10 calls/sec?

 

cc: @John_on_API 

PhilipDAth
Kind of a big deal
Kind of a big deal

I don't need to make more than 10 APS calls per second.  Everyone in the org running scripts and integrations probably does collectively, but that is not the core issue.

 

The problem is that the current async library uses a semaphore and doesn't pace the API calls out.  Consequently, you commonly run into 429 errors.

 

It lets the system "smash" out 10 API calls immediately - and then chokes the system.  It tends to cause 429 errors.

 

The Thottler() class rates limits the requests *uniformly* over time.  For example, if you configure it for 10 API calls per second, it makes sure the API calls don't happen more frequently than every 100ms.  This stops the API being smashed with the immediate 10 requests.

This stops the 429 errors from happening.

 

I am having to apply this fix frequently.

PhilipDAth
Kind of a big deal
Kind of a big deal

This sample code results in frequent 429 errors.  If you have more than 200 networks, it is pretty much guaranteed to fail.

 

#!/usr/bin/env python3
#

import os,argparse,asyncio,meraki.aio,datetime

# Load global and local Meraki settings such as MERAKI_DASHBOARD_API_KEY
from dotenv import load_dotenv
load_dotenv()
load_dotenv(dotenv_path=os.path.join(os.path.expanduser("~"),".meraki.env"))

# This function retrieves the orgId
async def getOrgId(dashboard,orgName):
	orgId=None

	# Search for the org
	for org in await dashboard.organizations.getOrganizations():
		if org['name'] == orgName:
			orgId=org['id']
			break;

	if orgId == None:
		print("Invalid organization name supplied: "+orgName)			
		exit(-1)

	return(orgId)

async def test429(dashboard,orgName):
	# Find out the organisation ID
	orgId=await getOrgId(dashboard,orgName)
	
	networks=await dashboard.organizations.getOrganizationNetworks(orgId)
	clientTasks=[dashboard.networks.getNetworkAlertsSettings(net['id']) for net in networks]
	for task in asyncio.as_completed(clientTasks):
		await task

async def main():
	# Meraki parameters
	orgName=None
	
	text="""
	429 async error generator
	"""

	parser = argparse.ArgumentParser(description = text)
	parser.add_argument("-o", "--orgName", help="Meraki org name")

	args=parser.parse_args()
	
	# Grab any arguments defined in the .env file
	orgName=os.getenv("orgName")

	# Apply any overrides from the command line
	if args.orgName: orgName=args.orgName
	
	# Check that we got all the parameters we need
	if not os.getenv("MERAKI_DASHBOARD_API_KEY"):
		print("MERAKI_DASHBOARD_API_KEY must be defined in .meraki.env in your home directory or in .env in the current directory")
		exit(-1)
	if not orgName:
		print("orgName must be defined on the command line, in .meraki.env in your home directory or in .env in the current directory")
		exit(-1)

	async with meraki.aio.AsyncDashboardAPI(
		output_log=False,
		print_console=False
	) as dashboard:
		await test429(dashboard,orgName)
	

if __name__ == "__main__":
	asyncio.run(main())

  

Include the throttler class and change the "with" command to the below - and it works with zero errors or warnings.

 

import os,argparse,asyncio,meraki.aio,datetime,throttler

	async with meraki.aio.AsyncDashboardAPI(
		output_log=False,
		print_console=False
	) as dashboard:
		dashboard._session._concurrent_requests_semaphore = throttler.Throttler(rate_limit=10, period=1.0)
		await test429(dashboard,orgName)
PhilipDAth
Kind of a big deal
Kind of a big deal

The API documentation sums up this issue nicely:

https://developer.cisco.com/meraki/api-v1/rate-limit/#rate-limit

When it describes that a token bucket is used.

https://en.wikipedia.org/wiki/Token_bucket#Algorithm

PhilipDAth_0-1758062026087.png

 

The tokens are allocated based on time by the Meraki API, but the current Python SDK semaphore approach ignores time, and just allows for 10 API calls to be in flight, regardless of the time.

Get notified when there are additional replies to this discussion.