coder-template/python/akamai_functions.py

551 lines
21 KiB
Python

from akamai.edgegrid import EdgeGridAuth
from urllib.parse import urljoin
from datetime import datetime, timedelta
import requests, json
import re
import time
import names
from flask import current_app
# Control Center Account is 'Marketplace Test01'
contract_id = 'ctr_V-41DUHPB'
# admin user API credential
client_secret = 'HP299+k2YnnyRKtbzKYtGOrM4ve1tXlrsn6o3ZZ/Mdw='
host = 'akab-hkgndwdao42uuh4y-q4itpussnn3gck4x.luna.akamaiapis.net'
access_token = 'akab-oaav5wopp546rowg-gjkaonxqkb6suzo7'
client_token = 'akab-4daj4uu4qpqiukly-5kbvowcutmvydqk2'
baseurl = 'https://' + host
# unused hours of user
unused_hours = 5
sess = requests.Session()
sess.auth = EdgeGridAuth(
client_token=client_token,
client_secret=client_secret,
access_token=access_token
)
def get_all_users():
response = sess.get(urljoin(
baseurl, '/identity-management/v3/user-admin/ui-identities?authGrants=true'))
s_code = response.status_code
body = response.json()
users = body
return users
def verify_user_assigned_ever(user, firstName, lastName):
is_user_assigned_ever = False
if user['firstName'] == firstName and user['lastName'] == lastName:
is_user_assigned_ever = True
return is_user_assigned_ever
def verify_user_email_domain(user):
pattern = r'^[a-zA-Z0-9_.+-]+@akamai-lab\.com$'
is_correct_domain = False
email = user['email']
if re.fullmatch(pattern, email):
is_correct_domain = True
return is_correct_domain
def verify_user_lastLoginDate(user, hours):
is_valid = False
if 'lastLoginDate' in user: # if user has ever logged in to Control Center
date = user['lastLoginDate']
loginDate = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
# current_app.logger.debug('lastLoginDate='+date)
currentDate = datetime.now()
# current_app.logger.debug('now='+str(currentDate))
difference = (currentDate - loginDate).total_seconds() / 3600
difference = int(difference)
# current_app.logger.debug('hoursSinceLastLogin='+str(difference)+' hours')
if difference > hours: # if user has not logged in longer than unusedDays
is_valid = True
else: # user has never logged in to Control Center yet
is_valid = True
return is_valid
def select_user(users, hours, firstName, lastName):
selectedUser = None
current_app.logger.debug('hours='+str(hours))
# To avoid user pool exhaustion, we reuse existing users if they remain in the pool.
for user in users:
if verify_user_assigned_ever(user, firstName, lastName):
if verify_user_lastLoginDate(user, hours) is False:
selectedUser = user
break
# if there is no existing user in the pool, we search for a random available user
if selectedUser == None:
for user in users:
if verify_user_email_domain(user) and verify_user_lastLoginDate(user, hours):
uiIdentityId = user['uiIdentityId']
selectedUser = update_user(uiIdentityId, firstName, lastName)
break
if selectedUser != None:
current_app.logger.debug('selectedUser= %s', selectedUser)
else:
raise Exception('cannot find an available user')
# we might want to create a new group and a user here???
return selectedUser
def reset_user_pwd(u_id):
response = sess.post(urljoin(baseurl, '/identity-management/v3/user-admin/ui-identities/' +
u_id+'/reset-password?sendEmail=false'))
pwd = response.json()['newPassword']
return pwd
def update_user(uiIdentityId, firstName, lastName):
requestBody = {
"firstName": "John",
"lastName": "Doe",
"country": "USA",
"phone": "3456788765",
"contactType": "Billing",
"preferredLanguage": "English",
"sessionTimeOut": 64800,
"timeZone": "GMT"
}
current_app.logger.debug('uiIdentityId= %s', uiIdentityId)
requestBody['firstName'] = firstName
requestBody['lastName'] = lastName
current_app.logger.debug('requestBody= %s', requestBody)
response = sess.put(urljoin(baseurl, '/identity-management/v3/user-admin/ui-identities/' +
uiIdentityId + '/basic-info'), json=requestBody)
s_code = response.status_code
current_app.logger.debug('update_user status code= %s', s_code)
body = response.json()
current_app.logger.debug('update_user response body= %s', body)
return body
def find_credential(authorizedUser, file_path):
credential =[]
file = open(file_path)
clients = json.load(file)
current_app.logger.debug(f'clients= {clients}')
for client in clients:
user = client["authorizedUser"]
if user == authorizedUser:
credential = client["credentials"]
break
if len(credential) > 0 :
print(f'credentials= {credential}')
return credential
else:
return None
def create_credential(credential):
# this credential belongs to the selected user
client_secret = credential["client_secret"]
host = credential["host"]
access_token = credential["access_token"]
client_token = credential["client_token"]
baseurl = "https://" + host
new_credential={
"client_secret": "",
"host": "",
"access_token": "",
"client_token": ""
}
new_credential["host"] = host
new_credential["access_token"] = access_token
sess = requests.Session()
sess.auth = EdgeGridAuth(
client_token=client_token,
client_secret=client_secret,
access_token=access_token
)
headers = { "Accept":"application/json"}
# create a new credential for the selected user
response = sess.post(urljoin(baseurl, "/identity-management/v3/api-clients/self/credentials"), headers=headers)
s_code = response.status_code
current_app.logger.debug(f"status code= {s_code}")
if s_code == 201:
body = response.json()
current_app.logger.debug(f"response body= {body}")
new_credential["client_secret"] = body["clientSecret"]
new_credential["client_token"] = body["clientToken"]
credentialId = body["credentialId"]
current_app.logger.info(f"new credentialId= {credentialId}")
# if a new credential is created, start updating its expiration
payload = {
"expiresOn": "2020-10-11T23:06:59.000Z",
"status": "ACTIVE",
"description": "Expiration 4 hours"
}
current = datetime.now()
current_app.logger.debug (f"current= {current}")
expiration = current + timedelta(hours = 4)
current_app.logger.debug(f"expiration= {expiration}")
payload["expiresOn"] = expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
current_app.logger.debug(f"payload= {payload}")
path = "/identity-management/v3/api-clients/self/credentials/{}".format(credentialId)
headers = {
"Content-Type": "application/json",
"Accept": "application/json"}
# update the new credential expiration
response = sess.put(urljoin(baseurl, path), headers=headers, json=payload)
s_code = response.status_code
current_app.logger.debug(f"status code= {s_code}")
if s_code == 200:
body = response.json()
current_app.logger.debug(f"update credential response= {body}")
return new_credential
else:
return None
else:
return None
#########################################################
# The following functions are for administrator only!!! #
#########################################################
def is_user_in_users(users, uiUserName):
is_user_in_users = False
users = get_all_users()
for user in users:
if user['uiUserName'] == uiUserName:
is_user_in_users = True
current_app.logger.debug('user %s exists in all users', uiUserName)
break
if is_user_in_users == False:
current_app.logger.debug(
'cannot find user %s in all users', uiUserName)
return is_user_in_users
#########################################################
# functions for deleting old properties #
#########################################################
def get_properties_by_group(group_id):
if isinstance(group_id, str) == False:
group_id = str(group_id)
response = sess.get(urljoin(
baseurl, '/papi/v1/properties?contractId='+contract_id+'&groupId='+group_id))
properties = response.json()
return properties
# this function is used by delete_properties(properties)
def get_property_details(property_id, contract_id, group_id):
response = sess.get(urljoin(baseurl, '/papi/v1/properties/' +
property_id+'?contractId='+contract_id+'&groupId='+group_id))
s_code = response.status_code
property_details = response.json()
if s_code != 200:
current_app.logger.debug(
'failed to check status of property '+property_id)
current_app.logger.debug(property_details)
else:
current_app.logger.debug('checked status of property '+property_id)
return property_details
def deactivate_property(property_id, network, property_version):
deactivate_payload = {
"acknowledgeAllWarnings": False,
"activationType": "DEACTIVATE",
"fastPush": True,
"ignoreHttpErrors": True,
"notifyEmails": ["learn@akamai.com"],
"useFastFallback": False,
"network": "STAGING",
"propertyVersion": 0
}
activation_link = None
if network == 'STAGING':
deactivate_payload['network'] = 'STAGING'
elif network == 'PRODUCTION':
deactivate_payload['network'] = 'PRODUCTION'
else:
current_app.abort(
500, description="deactivation target network value is invalid >> network= "+network)
current_app.logger.debug('deactivation target network is ' +
str(deactivate_payload['network']))
deactivate_payload['propertyVersion'] = property_version
current_app.logger.debug('property_version= '+str(deactivate_payload['propertyVersion'])
)
response = sess.post(urljoin(baseurl, '/papi/v1/properties/' +
property_id+'/activations'), json=deactivate_payload)
s_code = response.status_code
body = response.json()
current_app.logger.debug('deactivation response= '+str(body))
activation_link = str(body['activationLink'])
if s_code != 201:
current_app.logger.debug('failed to deactivate property '+property_id)
current_app.logger.debug(body)
else:
current_app.logger.debug('started deactivation of property ' +
property_id + ' with activationLink '+activation_link)
return activation_link
def delete_properties(properties):
deleted = False
staging_version = None
production_version = None
if len(properties['properties']['items']) > 0:
for property in properties['properties']['items']:
current_app.logger.debug('property= ' + str(property))
property_id = property['propertyId']
group_id = property['groupId']
current_app.logger.debug('property_id= '+property_id)
current_app.logger.debug('contract_id= '+contract_id)
current_app.logger.debug('group_id= '+group_id)
# Check property activation status
property_details = get_property_details(
property_id, contract_id, group_id)
current_app.logger.debug(
'property_details= '+str(property_details))
# Deactivate property if it is active in staging or network
staging_version = property_details['properties']['items'][0]['stagingVersion']
if staging_version != 'None' and isinstance(staging_version, int):
current_app.logger.debug('property '+property_id+' version ' +
str(staging_version)+' is active in staging network')
activation_link = deactivate_property(
property_id, 'STAGING', staging_version)
if activation_link != None:
current_app.logger.info(
'deactivation started in staging network. we will wait until it finishes')
while True:
time.sleep(1)
response = sess.get(urljoin(baseurl, activation_link))
s_code = response.status_code
body = response.json()
current_app.logger.debug(
'get activation response code= '+str(s_code))
current_app.logger.debug(
'get activation response body= '+str(body))
status = str(body['activations']['items'][0]['status'])
activation_id = str(
response['activations']['items'][0]['activationId'])
current_app.logger.debug('activation_id ' +
activation_id+' is '+status)
if status == 'DEACTIVATED':
current_app.logger.info(
'finished deactivation of property '+property_id + ' in staging network')
break
else:
current_app.logger.debug(
'deactivation is in progress. We will check activation status 1 second later')
else:
current_app.logger.debug('property '+property_id +
' is not active in staging network')
# Deactivate property if it is active in production or network
production_version = property_details['properties']['items'][0]['productionVersion']
if production_version != 'None' and isinstance(production_version, int):
current_app.logger.debug('property '+property_id+' version ' +
str(production_version)+' is active in production network')
activation_link = deactivate_property(
property_id, 'PRODUCTION', production_version)
if activation_link != None:
current_app.logger.info(
'deactivation started in production network. we will wait until it finishes')
while True:
time.sleep(1)
response = sess.get(urljoin(baseurl, activation_link))
body = response.json()
current_app.logger.debug(
'get activation response code= '+str(s_code))
current_app.logger.debug(
'get activation response body= '+str(body))
status = str(body['activations']['items'][0]['status'])
activation_id = str(
body['activations']['items'][0]['activationId'])
current_app.logger.debug('activation_id ' +
activation_id+' is '+status)
if status == 'DEACTIVATED':
current_app.logger.info(
'finished deactivation of property '+property_id)
break
else:
current_app.logger.debug(
'deactivation is in progress. We will check activation status again 1 second later')
else:
current_app.logger.debug('property '+property_id +
' is not active in production network')
# Delete properties
response = sess.delete(
urljoin(baseurl, '/papi/v1/properties/'+property_id))
s_code = response.status_code
body = response.json()
current_app.logger.debug('delete status code= '+str(s_code))
if s_code != 200:
current_app.logger.debug(
'failed to delete property '+property_id)
current_app.logger.debug(body)
deleted = False
break
else:
deleted = True
current_app.logger.debug('deleted property '+property_id)
else:
current_app.logger.debug('cannot find any property')
return deleted
######################################################
# functions for creating groups and users #
######################################################
def create_users(num):
new_users = []
for i in range(num):
group = create_group()
groupId = group['groupId']
groupName = group['groupName']
uiUserName = groupName + '@akamai-lab.com'
user = create_user(groupId, groupName)
if user != None:
current_app.logger.debug('user '+uiUserName+' is created')
new_users.append(user)
else:
current_app.logger.debug('cannot create a new user')
if len(new_users) > 0:
return new_users
else:
return None
def create_group():
parent_groupId = '232397'
groupName = str(datetime.now().strftime('%Y%m%d%H%M%S'))
current_app.logger.debug('groupName= %s', groupName)
requestBody = {"groupName": groupName}
response = sess.post(urljoin(
baseurl, '/identity-management/v3/user-admin/groups/'+parent_groupId), json=requestBody)
s_code = response.status_code
current_app.logger.debug(
'create_group status code= '+str(s_code))
body = response.json()
current_app.logger.debug('create_group response body= %s', body)
groupId = body['groupId']
if s_code == 201:
return body
else:
return 'cannot create a new group ' + str(groupName)
def create_user(groupId, groupName):
headers = {"accept": "application/json"}
requestBody = {
"authGrants": [
{
# 3 = admin, 928 = editor. refer to 'list_roles.json' for more information.
"roleId": 928,
"groupId": 12345
}
],
"firstName": "John",
"lastName": "Doe",
"email": "@akamai-lab.com",
"phone": "(123) 321-1234",
"additionalAuthentication": "NONE",
"country": "USA"
}
# firstName = names.get_first_name()
# lastName = names.get_last_name()
firstName = 'Not'
lastName = 'Assigned'
email = groupName + '@akamai-lab.com'
current_app.logger.debug('email= '+email)
requestBody['firstName'] = firstName
requestBody['lastName'] = lastName
requestBody['email'] = email
requestBody['authGrants'][0]['groupId'] = groupId
current_app.logger.debug('create_user request body= %s', requestBody)
response = sess.post(urljoin(
baseurl, '/identity-management/v3/user-admin/ui-identities?sendEmail=false'), json=requestBody, headers=headers)
s_code = response.status_code
current_app.logger.debug('create_user status code= %s', s_code)
body = response.json()
current_app.logger.debug('create_user response body= %s', body)
if s_code == 201:
return body
else:
return None
#######################################################
def list_roles():
response = sess.get(urljoin(
baseurl, '/identity-management/v3/user-admin/roles'))
s_code = response.status_code
current_app.logger.debug('list_roles status code= %s', s_code)
body = response.json()
current_app.logger.debug('list_roles response body= %s', body)
if s_code == 200:
return body
else:
return 'cannot list roles'
def get_groups(unused_hours):
pattern = r'^[a-zA-Z0-9_.+-]+@akamai-lab\.com$'
current_app.logger.debug('unused_hours='+str(unused_hours))
groups = []
users = get_all_users()
filtered_users = [{}]
for user in users:
# current_app.logger.debug('user= ' + str(user))
email = user['email']
current_app.logger.debug('email= '+email)
is_correct_user = False
if re.fullmatch(pattern, email): # if email domain is akamai-lab.com
if 'lastLoginDate' in user: # if the user has ever logged in to Control Center
date = user['lastLoginDate']
loginDate = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
current_app.logger.debug('lastLoginDate='+date)
currentDate = datetime.now()
current_app.logger.debug('now='+str(currentDate))
difference = (currentDate - loginDate).total_seconds() / 3600
current_app.logger.debug(
'hoursSinceLastLogin=' + str(difference)+'hour(s)')
if difference > unused_hours: # if the user has not logged in longer than unused hours
is_correct_user = True
else: # if the user has never logged in
is_correct_user = True
if is_correct_user:
groupId = user['authGrants'][0]['groupId']
groupName = user['authGrants'][0]['groupName']
group = {"groupName": "", "groupId": ""}
group['groupName'] = groupName
group['groupId'] = groupId
groups.append(group)
return groups