coder-template/build/python/akamai_functions.py

551 lines
21 KiB
Python
Raw Permalink Normal View History

2024-03-28 00:00:44 +00:00
from akamai.edgegrid import EdgeGridAuth
from urllib.parse import urljoin
from datetime import datetime, timedelta
import requests, json
import re
import time
import names
from flask import current_app
# Control Center Account is 'Marketplace Test01'
contract_id = 'ctr_V-41DUHPB'
# admin user API credential
client_secret = 'HP299+k2YnnyRKtbzKYtGOrM4ve1tXlrsn6o3ZZ/Mdw='
host = 'akab-hkgndwdao42uuh4y-q4itpussnn3gck4x.luna.akamaiapis.net'
access_token = 'akab-oaav5wopp546rowg-gjkaonxqkb6suzo7'
client_token = 'akab-4daj4uu4qpqiukly-5kbvowcutmvydqk2'
baseurl = 'https://' + host
# unused hours of user
unused_hours = 5
sess = requests.Session()
sess.auth = EdgeGridAuth(
client_token=client_token,
client_secret=client_secret,
access_token=access_token
)
def get_all_users():
response = sess.get(urljoin(
baseurl, '/identity-management/v3/user-admin/ui-identities?authGrants=true'))
s_code = response.status_code
body = response.json()
users = body
return users
def verify_user_assigned_ever(user, firstName, lastName):
is_user_assigned_ever = False
if user['firstName'] == firstName and user['lastName'] == lastName:
is_user_assigned_ever = True
return is_user_assigned_ever
def verify_user_email_domain(user):
pattern = r'^[a-zA-Z0-9_.+-]+@akamai-lab\.com$'
is_correct_domain = False
email = user['email']
if re.fullmatch(pattern, email):
is_correct_domain = True
return is_correct_domain
def verify_user_lastLoginDate(user, hours):
is_valid = False
if 'lastLoginDate' in user: # if user has ever logged in to Control Center
date = user['lastLoginDate']
loginDate = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
# current_app.logger.debug('lastLoginDate='+date)
currentDate = datetime.now()
# current_app.logger.debug('now='+str(currentDate))
difference = (currentDate - loginDate).total_seconds() / 3600
difference = int(difference)
# current_app.logger.debug('hoursSinceLastLogin='+str(difference)+' hours')
if difference > hours: # if user has not logged in longer than unusedDays
is_valid = True
else: # user has never logged in to Control Center yet
is_valid = True
return is_valid
def select_user(users, hours, firstName, lastName):
selectedUser = None
current_app.logger.debug('hours='+str(hours))
# To avoid user pool exhaustion, we reuse existing users if they remain in the pool.
for user in users:
if verify_user_assigned_ever(user, firstName, lastName):
if verify_user_lastLoginDate(user, hours) is False:
selectedUser = user
break
# if there is no existing user in the pool, we search for a random available user
if selectedUser == None:
for user in users:
if verify_user_email_domain(user) and verify_user_lastLoginDate(user, hours):
uiIdentityId = user['uiIdentityId']
selectedUser = update_user(uiIdentityId, firstName, lastName)
break
if selectedUser != None:
current_app.logger.debug('selectedUser= %s', selectedUser)
else:
raise Exception('cannot find an available user')
# we might want to create a new group and a user here???
return selectedUser
def reset_user_pwd(u_id):
response = sess.post(urljoin(baseurl, '/identity-management/v3/user-admin/ui-identities/' +
u_id+'/reset-password?sendEmail=false'))
pwd = response.json()['newPassword']
return pwd
def update_user(uiIdentityId, firstName, lastName):
requestBody = {
"firstName": "John",
"lastName": "Doe",
"country": "USA",
"phone": "3456788765",
"contactType": "Billing",
"preferredLanguage": "English",
"sessionTimeOut": 64800,
"timeZone": "GMT"
}
current_app.logger.debug('uiIdentityId= %s', uiIdentityId)
requestBody['firstName'] = firstName
requestBody['lastName'] = lastName
current_app.logger.debug('requestBody= %s', requestBody)
response = sess.put(urljoin(baseurl, '/identity-management/v3/user-admin/ui-identities/' +
uiIdentityId + '/basic-info'), json=requestBody)
s_code = response.status_code
current_app.logger.debug('update_user status code= %s', s_code)
body = response.json()
current_app.logger.debug('update_user response body= %s', body)
return body
def find_credential(authorizedUser, file_path):
credential =[]
file = open(file_path)
clients = json.load(file)
current_app.logger.debug(f'clients= {clients}')
for client in clients:
user = client["authorizedUser"]
if user == authorizedUser:
credential = client["credentials"]
break
if len(credential) > 0 :
print(f'credentials= {credential}')
return credential
else:
return None
def create_credential(credential):
# this credential belongs to the selected user
client_secret = credential["client_secret"]
host = credential["host"]
access_token = credential["access_token"]
client_token = credential["client_token"]
baseurl = "https://" + host
new_credential={
"client_secret": "",
"host": "",
"access_token": "",
"client_token": ""
}
new_credential["host"] = host
new_credential["access_token"] = access_token
sess = requests.Session()
sess.auth = EdgeGridAuth(
client_token=client_token,
client_secret=client_secret,
access_token=access_token
)
headers = { "Accept":"application/json"}
# create a new credential for the selected user
response = sess.post(urljoin(baseurl, "/identity-management/v3/api-clients/self/credentials"), headers=headers)
s_code = response.status_code
current_app.logger.debug(f"status code= {s_code}")
if s_code == 201:
body = response.json()
current_app.logger.debug(f"response body= {body}")
new_credential["client_secret"] = body["clientSecret"]
new_credential["client_token"] = body["clientToken"]
credentialId = body["credentialId"]
current_app.logger.info(f"new credentialId= {credentialId}")
# if a new credential is created, start updating its expiration
payload = {
"expiresOn": "2020-10-11T23:06:59.000Z",
"status": "ACTIVE",
"description": "Expiration 4 hours"
}
current = datetime.now()
current_app.logger.debug (f"current= {current}")
expiration = current + timedelta(hours = 4)
current_app.logger.debug(f"expiration= {expiration}")
payload["expiresOn"] = expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
current_app.logger.debug(f"payload= {payload}")
path = "/identity-management/v3/api-clients/self/credentials/{}".format(credentialId)
headers = {
"Content-Type": "application/json",
"Accept": "application/json"}
# update the new credential expiration
response = sess.put(urljoin(baseurl, path), headers=headers, json=payload)
s_code = response.status_code
current_app.logger.debug(f"status code= {s_code}")
if s_code == 200:
body = response.json()
current_app.logger.debug(f"update credential response= {body}")
return new_credential
else:
return None
else:
return None
#########################################################
# The following functions are for administrator only!!! #
#########################################################
def is_user_in_users(users, uiUserName):
is_user_in_users = False
users = get_all_users()
for user in users:
if user['uiUserName'] == uiUserName:
is_user_in_users = True
current_app.logger.debug('user %s exists in all users', uiUserName)
break
if is_user_in_users == False:
current_app.logger.debug(
'cannot find user %s in all users', uiUserName)
return is_user_in_users
#########################################################
# functions for deleting old properties #
#########################################################
def get_properties_by_group(group_id):
if isinstance(group_id, str) == False:
group_id = str(group_id)
response = sess.get(urljoin(
baseurl, '/papi/v1/properties?contractId='+contract_id+'&groupId='+group_id))
properties = response.json()
return properties
# this function is used by delete_properties(properties)
def get_property_details(property_id, contract_id, group_id):
response = sess.get(urljoin(baseurl, '/papi/v1/properties/' +
property_id+'?contractId='+contract_id+'&groupId='+group_id))
s_code = response.status_code
property_details = response.json()
if s_code != 200:
current_app.logger.debug(
'failed to check status of property '+property_id)
current_app.logger.debug(property_details)
else:
current_app.logger.debug('checked status of property '+property_id)
return property_details
def deactivate_property(property_id, network, property_version):
deactivate_payload = {
"acknowledgeAllWarnings": False,
"activationType": "DEACTIVATE",
"fastPush": True,
"ignoreHttpErrors": True,
"notifyEmails": ["learn@akamai.com"],
"useFastFallback": False,
"network": "STAGING",
"propertyVersion": 0
}
activation_link = None
if network == 'STAGING':
deactivate_payload['network'] = 'STAGING'
elif network == 'PRODUCTION':
deactivate_payload['network'] = 'PRODUCTION'
else:
current_app.abort(
500, description="deactivation target network value is invalid >> network= "+network)
current_app.logger.debug('deactivation target network is ' +
str(deactivate_payload['network']))
deactivate_payload['propertyVersion'] = property_version
current_app.logger.debug('property_version= '+str(deactivate_payload['propertyVersion'])
)
response = sess.post(urljoin(baseurl, '/papi/v1/properties/' +
property_id+'/activations'), json=deactivate_payload)
s_code = response.status_code
body = response.json()
current_app.logger.debug('deactivation response= '+str(body))
activation_link = str(body['activationLink'])
if s_code != 201:
current_app.logger.debug('failed to deactivate property '+property_id)
current_app.logger.debug(body)
else:
current_app.logger.debug('started deactivation of property ' +
property_id + ' with activationLink '+activation_link)
return activation_link
def delete_properties(properties):
deleted = False
staging_version = None
production_version = None
if len(properties['properties']['items']) > 0:
for property in properties['properties']['items']:
current_app.logger.debug('property= ' + str(property))
property_id = property['propertyId']
group_id = property['groupId']
current_app.logger.debug('property_id= '+property_id)
current_app.logger.debug('contract_id= '+contract_id)
current_app.logger.debug('group_id= '+group_id)
# Check property activation status
property_details = get_property_details(
property_id, contract_id, group_id)
current_app.logger.debug(
'property_details= '+str(property_details))
# Deactivate property if it is active in staging or network
staging_version = property_details['properties']['items'][0]['stagingVersion']
if staging_version != 'None' and isinstance(staging_version, int):
current_app.logger.debug('property '+property_id+' version ' +
str(staging_version)+' is active in staging network')
activation_link = deactivate_property(
property_id, 'STAGING', staging_version)
if activation_link != None:
current_app.logger.info(
'deactivation started in staging network. we will wait until it finishes')
while True:
time.sleep(1)
response = sess.get(urljoin(baseurl, activation_link))
s_code = response.status_code
body = response.json()
current_app.logger.debug(
'get activation response code= '+str(s_code))
current_app.logger.debug(
'get activation response body= '+str(body))
status = str(body['activations']['items'][0]['status'])
activation_id = str(
response['activations']['items'][0]['activationId'])
current_app.logger.debug('activation_id ' +
activation_id+' is '+status)
if status == 'DEACTIVATED':
current_app.logger.info(
'finished deactivation of property '+property_id + ' in staging network')
break
else:
current_app.logger.debug(
'deactivation is in progress. We will check activation status 1 second later')
else:
current_app.logger.debug('property '+property_id +
' is not active in staging network')
# Deactivate property if it is active in production or network
production_version = property_details['properties']['items'][0]['productionVersion']
if production_version != 'None' and isinstance(production_version, int):
current_app.logger.debug('property '+property_id+' version ' +
str(production_version)+' is active in production network')
activation_link = deactivate_property(
property_id, 'PRODUCTION', production_version)
if activation_link != None:
current_app.logger.info(
'deactivation started in production network. we will wait until it finishes')
while True:
time.sleep(1)
response = sess.get(urljoin(baseurl, activation_link))
body = response.json()
current_app.logger.debug(
'get activation response code= '+str(s_code))
current_app.logger.debug(
'get activation response body= '+str(body))
status = str(body['activations']['items'][0]['status'])
activation_id = str(
body['activations']['items'][0]['activationId'])
current_app.logger.debug('activation_id ' +
activation_id+' is '+status)
if status == 'DEACTIVATED':
current_app.logger.info(
'finished deactivation of property '+property_id)
break
else:
current_app.logger.debug(
'deactivation is in progress. We will check activation status again 1 second later')
else:
current_app.logger.debug('property '+property_id +
' is not active in production network')
# Delete properties
response = sess.delete(
urljoin(baseurl, '/papi/v1/properties/'+property_id))
s_code = response.status_code
body = response.json()
current_app.logger.debug('delete status code= '+str(s_code))
if s_code != 200:
current_app.logger.debug(
'failed to delete property '+property_id)
current_app.logger.debug(body)
deleted = False
break
else:
deleted = True
current_app.logger.debug('deleted property '+property_id)
else:
current_app.logger.debug('cannot find any property')
return deleted
######################################################
# functions for creating groups and users #
######################################################
def create_users(num):
new_users = []
for i in range(num):
group = create_group()
groupId = group['groupId']
groupName = group['groupName']
uiUserName = groupName + '@akamai-lab.com'
user = create_user(groupId, groupName)
if user != None:
current_app.logger.debug('user '+uiUserName+' is created')
new_users.append(user)
else:
current_app.logger.debug('cannot create a new user')
if len(new_users) > 0:
return new_users
else:
return None
def create_group():
parent_groupId = '232397'
groupName = str(datetime.now().strftime('%Y%m%d%H%M%S'))
current_app.logger.debug('groupName= %s', groupName)
requestBody = {"groupName": groupName}
response = sess.post(urljoin(
baseurl, '/identity-management/v3/user-admin/groups/'+parent_groupId), json=requestBody)
s_code = response.status_code
current_app.logger.debug(
'create_group status code= '+str(s_code))
body = response.json()
current_app.logger.debug('create_group response body= %s', body)
groupId = body['groupId']
if s_code == 201:
return body
else:
return 'cannot create a new group ' + str(groupName)
def create_user(groupId, groupName):
headers = {"accept": "application/json"}
requestBody = {
"authGrants": [
{
# 3 = admin, 928 = editor. refer to 'list_roles.json' for more information.
"roleId": 928,
"groupId": 12345
}
],
"firstName": "John",
"lastName": "Doe",
"email": "@akamai-lab.com",
"phone": "(123) 321-1234",
"additionalAuthentication": "NONE",
"country": "USA"
}
# firstName = names.get_first_name()
# lastName = names.get_last_name()
firstName = 'Not'
lastName = 'Assigned'
email = groupName + '@akamai-lab.com'
current_app.logger.debug('email= '+email)
requestBody['firstName'] = firstName
requestBody['lastName'] = lastName
requestBody['email'] = email
requestBody['authGrants'][0]['groupId'] = groupId
current_app.logger.debug('create_user request body= %s', requestBody)
response = sess.post(urljoin(
baseurl, '/identity-management/v3/user-admin/ui-identities?sendEmail=false'), json=requestBody, headers=headers)
s_code = response.status_code
current_app.logger.debug('create_user status code= %s', s_code)
body = response.json()
current_app.logger.debug('create_user response body= %s', body)
if s_code == 201:
return body
else:
return None
#######################################################
def list_roles():
response = sess.get(urljoin(
baseurl, '/identity-management/v3/user-admin/roles'))
s_code = response.status_code
current_app.logger.debug('list_roles status code= %s', s_code)
body = response.json()
current_app.logger.debug('list_roles response body= %s', body)
if s_code == 200:
return body
else:
return 'cannot list roles'
def get_groups(unused_hours):
pattern = r'^[a-zA-Z0-9_.+-]+@akamai-lab\.com$'
current_app.logger.debug('unused_hours='+str(unused_hours))
groups = []
users = get_all_users()
filtered_users = [{}]
for user in users:
# current_app.logger.debug('user= ' + str(user))
email = user['email']
current_app.logger.debug('email= '+email)
is_correct_user = False
if re.fullmatch(pattern, email): # if email domain is akamai-lab.com
if 'lastLoginDate' in user: # if the user has ever logged in to Control Center
date = user['lastLoginDate']
loginDate = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
current_app.logger.debug('lastLoginDate='+date)
currentDate = datetime.now()
current_app.logger.debug('now='+str(currentDate))
difference = (currentDate - loginDate).total_seconds() / 3600
current_app.logger.debug(
'hoursSinceLastLogin=' + str(difference)+'hour(s)')
if difference > unused_hours: # if the user has not logged in longer than unused hours
is_correct_user = True
else: # if the user has never logged in
is_correct_user = True
if is_correct_user:
groupId = user['authGrants'][0]['groupId']
groupName = user['authGrants'][0]['groupName']
group = {"groupName": "", "groupId": ""}
group['groupName'] = groupName
group['groupId'] = groupId
groups.append(group)
return groups