Having problems with asyncio and 429 errors

PhilipDAth
Kind of a big deal
Kind of a big deal

Having problems with asyncio and 429 errors

I have been tasked with creating a list of every client in an org that has been whitelisted.  I think there could be between 200,000 and 300,000 clients.

 

My plan of attack was to iterate through every network, use getNetworkClients to get every client, and iterate through them calling getNetworkClientPolicy on every client to see if they are whitelisted.

 

If I drop the concurrency to just 1 the below code snippet works fine (but very slow):

async with meraki.aio.AsyncDashboardAPI(
	output_log=False,
	print_console=False,
	maximum_concurrent_requests=1
) as dashboard:
	await searchOrg(dashboard,orgName)
...
clientTasks = [checkPolicy(dashboard,net['id'],client) for client in await dashboard.networks.getNetworkClients(net['id'],total_pages='all',timespan=1*86400)]
for task in asyncio.as_completed(clientTasks):
	await task
...
async def checkPolicy(dashboard,netId,client):
	policies=await dashboard.networks.getNetworkClientPolicy(netId,client['id'])
	if policies['devicePolicy'] == 'Whitelisted':
		print(f"{client['mac']},{client['ip']},{client['description']}")
	elif policies['devicePolicy'] == 'Different policies by SSID':
		print(f"{client['mac']},{client['ip']},{client['description']},{policies['ssids']}")

 

If I change maximum_concurrent_requests from 1 to 2 I start getting a reasonable number of 429 warnings, but it does run.  It I change it to 5 I get a huge number of 429s, and this error gets thrown after a short amount of processing time:

meraki.exceptions.AsyncAPIError: networks, getNetworkClientPolicy - 429 Too Many Requests, Reached retry limit: None

 

I don't understand with a limit of just 2 why I am hitting any limit and getting 429s.  And then, if the rety limit is "none", why is it throwing an exception.

 

 

I really want the concurrency at least at 5.  Any ideas?

8 REPLIES 8
sungod
Head in the Cloud

In the async with block, add the parameters to wait on retry and set a high retry count - this is how I do it, seems reliable.

 

The library will start adding waits before retry and the result is that it'll converge on running at about the natural rate limit.

 

    async with meraki.aio.AsyncDashboardAPI(
        api_key=API_KEY,
        base_url='https://api.meraki.com/api/v1/',
        print_console=False,
        output_log=False,
        suppress_logging=True,
        wait_on_rate_limit=True,
        maximum_retries=100
    ) as aiomeraki:

 

LearningIsFun
Getting noticed

I have had luck with using a Throttler, but you have to play with the timers sometimes.

It will also depend if you have other scripts running against the API on your ORG, which will impact your rate limit.

 

from asyncio_throttle import Throttler

# Used to control flow of API calls to the API interface
throttler = Throttler(rate_limit=16, period=1)

# snip of code

async def gather_ports(aiomeraki, serial):

    # had to use throttler or it would overun the API.
    async with throttler:
        try:
            switch_ports = await aiomeraki.switch.getDeviceSwitchPorts(serial)

        except meraki.AsyncAPIError as e:
            print(f"Meraki API error: {e}")
        except Exception as e:
            print(f"some other error: {e}")

 

 

Greenberet
Head in the Cloud

the asyncio api is just a really dumb semaphore. It is just sending 5 (per default) maximum requests at the same time.
So if the response of the requests is really quick, then we might hit the rate limit with the script.

 

you could try to overwrite the builtin semaphore with the throttler (it looks really nice btw. To bad, that it wasn't updated in 2 years 🙄 )

 

 

aiomeraki._session._concurrent_requests_semaphore = throttler

 

>To bad, that it wasn't updated in 2 years

There are no issues or pull requests.  Perhaps it is as simple as nothing is broken, and it does exactly all it needs to do.  🙂

PhilipDAth
Kind of a big deal
Kind of a big deal

Thank you for you all your suggestions.  I tried several of these approaches and found this one worked the best for me.

 

I used this module:
https://github.com/uburuntu/throttler 
pip3 install -U throttler

I added this import:
import throttler

 

And then I used @Greenberet's idea to replace the semaphore:

async with meraki.aio.AsyncDashboardAPI(
	output_log=False,
	print_console=False
) as dashboard:
	dashboard._session._concurrent_requests_semaphore = throttler.Throttler(rate_limit=5, period=1.0)
	await searchOrg(dashboard,orgName)

 

I get zero 429 errors, no retries, nothing.  It just pumps through the requests at 5 per second.

PhilipDAth
Kind of a big deal
Kind of a big deal

As it turns out - not completely solved.  I think I might be running into Python bugs (using 3.11.3).  Maybe 200,000 API calls in I start getting errors like:

[WinError 121] The semaphore timeout period has expired

 

Then the rate of 429s start going up a lot - but it does still work (I think, hard to tell for sure).  I've had to bumpo the maximum_retries as suggested by @sungod.

sungod
Head in the Cloud

When there's a 429 return, the library code looks for Retry-After in the response headers and waits for that period.

 

If that isn't in the headers, it waits for a random time between 1 and nginx_429_retry_wait_time (default is 60, you can specify a different value in the async block.)

 

So the 429s happen, but they don't matter due to the retry mechanism

 

If you're issuing a lot of calls at once, they'll get stacked up by the wait/retry, and maybe you'll hit some internal resource limit, might be worth making the the calls in batches of, say, 2,000.

 

Did you solve your issue or are you still having problems ?

 

Get notified when there are additional replies to this discussion.