Build a new script last night it works much better but still hits the API limit. 😅 # This is a Python 3 script to count the total unique client MAC addresses connected to MR access points for
# an organization during the last month.
#
# Usage:
# clientcount.py -k <api key> [-o <org name>]
#
# Parameters:
# -k <api key> : Mandatory. Your Meraki Dashboard API key
# -o <org name> : Optional. Name of the organization you want to process. Use keyword "/all" to explicitly
# specify all orgs. Default is "/all"
#
# Example:
# clientcount.py -k 1234 -o "Big Industries Inc"
#
# Notes:
# * In Windows, use double quotes ("") to enter command line parameters containing spaces.
# * This script was built for Python 3.7.1.
# * Depending on your operating system, the command to start python can be either "python" or "python3".
#
# Required Python modules:
# Requests : http://docs.python-requests.org
#
# After installing Python, you can install these additional modules using pip with the following commands:
# pip install requests
#
# Depending on your operating system, the command can be "pip3" instead of "pip".
#
#
import sys, getopt, requests, json, time, datetime, os, sqlite3, threading
from ratelimit import limits
#SECTION: GLOBAL VARIABLES: MODIFY TO CHANGE SCRIPT BEHAVIOUR
#SECTION: GLOBAL VARIABLES AND CLASSES: DO NOT MODIFY
ARG_APIKEY = '' #DO NOT STATICALLY SET YOUR API KEY HERE
ARG_ORGNAME = '' #DO NOT STATICALLY SET YOUR ORGANIZATION NAME HERE
ORG_LIST = None #list of organizations, networks and MRs the used API key has access to
MAX_CLIENT_TIMESPAN = 2592000 #maximum timespan GET clients Dashboard API call supports
class c_Net:
def __init__(self):
id = ''
name = ''
shard = 'api.meraki.com'
devices = []
class c_Organization:
def __init__(self):
id = ''
name = ''
shard = 'api.meraki.com'
nets = []
#SECTION: General use functions
#make it work nice across threads
def RateLimited(max_per_second):
'''
Decorator that make functions not be called faster than
'''
lock = threading.Lock()
minInterval = 1.0 / float(max_per_second)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args):
lock.acquire()
elapsed = time.perf_counter() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
lock.release()
ret = func(*args)
lastTimeCalled[0] = time.perf_counter()
return ret
return rateLimitedFunction
return decorate
def printhelp():
print('This is a Python 3 script to count the total unique client MAC addresses connected to MR access points for')
print(' an organization during the last month.')
print('')
print('Usage:')
print(' clientcount.py -k <api key> [-o <org name>]')
print('')
print('Parameters:')
print(' -k <api key> : Mandatory. Your Meraki Dashboard API key')
print(' -o <org name> : Optional. Name of the organization you want to process. Use keyword "/all" to explicitly')
print(' specify all orgs. Default is "/all"')
print('')
print('Example:')
print(' clientcount.py -k 1234 -o "Big Industries Inc"')
print('')
print('Notes:')
print(' * In Windows, use double quotes ("") to enter command line parameters containing spaces.')
#SECTION: Meraki Dashboard API communication functions
@RateLimited(0.3)
def getInventory(p_org):
#returns a list of all networks in an organization
try:
r = requests.get('https://api.meraki.com/api/v0/organizations/%s/inventory' % (p_org.id), headers={'X-Cisco-Meraki-API-Key': ARG_APIKEY, 'Content-Type': 'application/json'} )
except:
print('ERROR 06: Unable to contact Meraki cloud')
return(None)
if r.status_code != requests.codes.ok:
print(f'Rate limited activated - Retrying after {r.headers["Retry-After"]}.')
time.sleep(int(r.headers['Retry-After']))
#return(None)
raise Exception('API response: {}'.format(r.status_code))
return response
return(r.json())
@RateLimited(0.3)
def getNetworks(p_org):
#returns a list of all networks in an organization
try:
r = requests.get('https://api.meraki.com/api/v0/organizations/%s/networks' % (p_org.id), headers={'X-Cisco-Meraki-API-Key': ARG_APIKEY, 'Content-Type': 'application/json'} )
except:
print('ERROR 07: Unable to contact Meraki cloud')
return(None)
if r.status_code != requests.codes.ok:
print(f'Rate limited activated - Retrying after {r.headers["Retry-After"]}.')
time.sleep(int(r.headers['Retry-After']))
#return(None)
raise Exception('API response: {}'.format(r.status_code))
return response
return(r.json())
@RateLimited(0.3)
def getNetworks_clients(p_org, p_dev):
#returns a list of all networks in an organization
try:
r = requests.get('https://api.meraki.com/api/v0/organizations/%s/networks/%s/clients' % (p_org.id, p_dev), headers={'X-Cisco-Meraki-API-Key': ARG_APIKEY, 'Content-Type': 'application/json'} )
except:
print('ERROR 08: Unable to contact Meraki cloud')
return(None)
if r.status_code != requests.codes.ok:
print(f'Rate limited activated - Retrying after {r.headers["Retry-After"]}.')
time.sleep(int(r.headers['Retry-After']))
#return(None)
raise Exception('API response: {}'.format(r.status_code))
return response
return(r.json())
@RateLimited(0.3)
def getNetworks_clients_id(p_org, p_dev, p_id):
#returns a list of all networks in an organization
try:
r = requests.get('https://api.meraki.com/api/v0/organizations/%s/networks/%s/clients/%s' % (p_org.id, p_dev, p_id), headers={'X-Cisco-Meraki-API-Key': ARG_APIKEY, 'Content-Type': 'application/json'} )
except:
print('ERROR 08: Unable to contact Meraki cloud')
return(None)
if r.status_code != requests.codes.ok:
print(f'Rate limited activated - Retrying after {r.headers["Retry-After"]}.')
time.sleep(int(r.headers['Retry-After']))
#return(None)
raise Exception('API response: {}'.format(r.status_code))
return response
return(r.json())
@RateLimited(0.3)
def getOrgs():
#returns the organizations' list for a specified admin, with filters applied
try:
r = requests.get('https://api.meraki.com/api/v0/organizations', headers={'X-Cisco-Meraki-API-Key': ARG_APIKEY, 'Content-Type': 'application/json'} )
except:
print('ERROR 01: Unable to contact Meraki cloud')
return(None)
if r.status_code != requests.codes.ok:
print(f'Rate limited activated - Retrying after {r.headers["Retry-After"]}.')
time.sleep(int(r.headers['Retry-After']))
#return(None)
raise Exception('API response: {}'.format(r.status_code))
return response
rjson = r.json()
orglist = []
listlen = -1
if ARG_ORGNAME.lower() == '/all':
for org in rjson:
orglist.append(c_Organization())
listlen += 1
orglist[listlen].id = org['id']
orglist[listlen].name = org['name']
else:
for org in rjson:
if org['name'] == ARG_ORGNAME:
orglist.append(c_Organization())
listlen += 1
orglist[listlen].id = org['id']
orglist[listlen].name = org['name']
return(orglist)
@RateLimited(0.3)
def getShardHost(p_org):
#Looks up shard URL for a specific org. Use this URL instead of 'api.meraki.com'
# when making API calls with API accounts that can access multiple orgs.
#On failure returns None
try:
r = requests.get('https://api.meraki.com/api/v0/organizations/%s/snmp' % p_org.id, headers={'X-Cisco-Meraki-API-Key': ARG_APIKEY, 'Content-Type': 'application/json'} )
except:
print('ERROR 08: Unable to contact Meraki cloud')
return None
if r.status_code != requests.codes.ok:
print(f'Rate limited activated - Retrying after {r.headers["Retry-After"]}.')
time.sleep(int(r.headers['Retry-After']))
#return(None)
raise Exception('API response: {}'.format(r.status_code))
return response
rjson = r.json()
return(rjson['hostname'])
@RateLimited(0.3)
def refreshOrgList():
global ORG_LIST
cnt_80211b = 0
cnt_80211n = 0
cnt_80211ac = 0
cnt_80211ax = 0
cnt = 0
print('INFO: Starting org list refresh at %s...' % datetime.datetime.now())
orglist = getOrgs()
if not orglist is None:
for org in orglist:
print('INFO: Processing org "%s"' % org.name)
org.shard = 'api.meraki.com'
orgshard = getShardHost(org)
if not orgshard is None:
org.shard = orgshard
netlist = getNetworks(org)
devlist = getInventory(org)
for device in netlist:
if device['name'] is not None:
print (device['name'])
clidev = device['id']
client_listlist = getNetworks_clients (org, clidev)
for client in client_listlist:
if client['ssid'] is not None:
client_id = client['id']
client_spec = getNetworks_clients_id(org, clidev, client_id)
if "802.11b" in client_spec['wirelessCapabilities']:
cnt_80211b = cnt_80211b + 1
elif "802.11n" in client_spec['wirelessCapabilities']:
cnt_80211n = cnt_80211n + 1
elif "802.11ac" in client_spec['wirelessCapabilities']:
cnt_80211ac = cnt_80211ac + 1
elif "802.11ax" in client_spec['wirelessCapabilities']:
cnt_80211ax = cnt_80211ax + 1
print ("802.11b")
print (cnt_80211b)
print ("802.11n")
print (cnt_80211n)
print ("802.11ac")
print (cnt_80211ac)
print ("802.11ax")
print (cnt_80211ax)
LAST_ORGLIST_REFRESH = datetime.datetime.now()
print('INFO: Refresh complete at %s' % LAST_ORGLIST_REFRESH)
return None
#SECTION: main
def main(argv):
global ARG_APIKEY
global ARG_ORGNAME
#initialize command line arguments
ARG_APIKEY = ''
ARG_ORGNAME = ''
arg_numresults = ''
arg_mode = ''
arg_filter = ''
#get command line arguments
try:
opts, args = getopt.getopt(argv, 'hk:o:m:')
except getopt.GetoptError:
printhelp()
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
printhelp()
sys.exit()
elif opt == '-k':
ARG_APIKEY = arg
elif opt == '-o':
ARG_ORGNAME = arg
elif opt == '-m':
arg_mode = arg
#check that all mandatory arguments have been given
if ARG_APIKEY == '':
printhelp()
sys.exit(2)
#set defaults for empty command line arguments
if ARG_ORGNAME == '':
ARG_ORGNAME = '/all'
refreshOrgList()
if __name__ == '__main__':
main(sys.argv[1:])
... View more