Commit 597a0d1f authored by Tien Mai's avatar Tien Mai Committed by Commit Bot

Adding sample script for using CBCM Takeout API.

This script will query extensions in the domain and
generate a list of machines that have them installed.

Change-Id: Ie70b982ca855b6364b4100f0b42adeee73f0b60a
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2044331
Commit-Queue: Tien Mai <tienmai@chromium.org>
Reviewed-by: default avatarJulian Pastarmov <pastarmovj@chromium.org>
Cr-Commit-Position: refs/heads/master@{#742164}
parent 0aa3f7ec
#!/usr/bin/env python
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Transform CBCM Takeout API Data (Python3)."""
import argparse
import csv
import json
import sys
import google_auth_httplib2
from httplib2 import Http
from google.oauth2.service_account import Credentials
def ComputeExtensionsList(extensions_list, data):
"""Computes list of machines that have an extension.
This sample function processes the |data| retrieved from the Takeout API and
calculates the list of machines that have installed each extension listed in
the data.
Args:
extensions_list: the extension list dictionary to fill.
data: the data fetched from the Takeout API.
"""
for device in data['browsers']:
if 'browsers' not in device:
continue
for browser in device['browsers']:
if 'profiles' not in browser:
continue
for profile in browser['profiles']:
if 'extensions' not in profile:
continue
for extension in profile['extensions']:
key = extension['extensionId']
if 'version' in extension:
key = key + ' @ ' + extension['version']
if key not in extensions_list:
current_extension = {
'name':
extension['name'],
'permissions':
extension['permissions']
if 'permissions' in extension else '',
'installed':
set(),
'disabled':
set(),
'forced':
set()
}
else:
current_extension = extensions_list[key]
machine_name = device['machineName']
current_extension['installed'].add(machine_name)
if 'installType' in extension and extension['installType'] == 3:
current_extension['forced'].add(machine_name)
if 'disabled' in extension and extension['disabled']:
current_extension['disabled'].add(machine_name)
extensions_list[key] = current_extension
def DictToList(data, key_name='id'):
"""Converts a dict into a list.
The value of each member of |data| must also be a dict. The original key for
the value will be inlined into the value, under the |key_name| key.
Args:
data: a dict where every value is a dict
key_name: the name given to the key that is inlined into the dict's values
Yields:
The values from |data|, with each value's key inlined into the value.
"""
assert isinstance(data, dict), '|data| must be a dict'
for key, value in data.items():
assert isinstance(value, dict), '|value| must contain dict items'
value[key_name] = key
yield value
def Flatten(data):
"""Flattens lists inside |data|, one level deep.
This function will flatten each dictionary key in |data| into a single row
so that it can be written to a CSV file.
Args:
data: the data to be flattened.
Yields:
A list of dict objects whose lists or sets have been flattened.
"""
for item in data:
counts = {}
for prop, value in item.items():
if isinstance(value, (list, set)):
item[prop] = ', '.join(sorted(value))
counts['num_' + prop] = len(value)
assert isinstance(item[prop],
(int, bool, str)), ('unexpected type for item: %s' %
type(item[prop]).__name__)
item.update(counts)
yield item
def ExtensionListAsCsv(extensions_list, csv_filename, sort_column='name'):
"""Saves an extensions list to a CSV file.
Args:
extensions_list: an extensions list as returned by ComputeExtensionsList
csv_filename: the name of the CSV file to save
sort_column: the name of the column by which to sort the data
"""
flattened_list = list(Flatten(DictToList(extensions_list)))
fieldnames = flattened_list[0].keys() if flattened_list else []
desired_column_order = [
'id', 'name', 'num_permissions', 'num_installed', 'num_disabled',
'num_forced', 'permissions', 'installed', 'disabled', 'forced'
]
# Order the columns as desired. Columns other than those in
# |desired_column_order| will be in an unspecified order after these columns.
ordered_fieldnames = list(c for c in desired_column_order if c in fieldnames)
ordered_fieldnames.extend(
[x for x in desired_column_order if x not in ordered_fieldnames])
with open(csv_filename, mode='w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=ordered_fieldnames)
writer.writeheader()
for row in sorted(flattened_list, key=lambda ext: ext[sort_column]):
writer.writerow(row)
def main(args):
if not args.admin_email:
print('admin_email must be specified.')
sys.exit(1)
if not args.service_account_key_path:
print('service_account_key_path must be specified.')
sys.exit(1)
# Load the json format key that you downloaded from the Google API
# Console when you created your service account. For p12 keys, use the
# from_p12_keyfile method of ServiceAccountCredentials and specify the
# service account email address, p12 keyfile, and scopes.
service_credentials = Credentials.from_service_account_file(
args.service_account_key_path,
scopes=[
'https://www.googleapis.com/auth/admin.directory.device.chromebrowsers'
],
subject=args.admin_email)
try:
http = google_auth_httplib2.AuthorizedHttp(service_credentials, http=Http())
extensions_list = {}
base_request_url = 'https://admin.googleapis.com/admin/directory/v1.1beta1/customer/my_customer/devices/chromebrowsers'
request_parameters = ''
browsers_processed = 0
while True:
print('Making request to server ...')
data = json.loads(
http.request(base_request_url + '?' + request_parameters, 'GET')[1])
browsers_in_data = len(data['browsers'])
print('Request returned %s results, analyzing ...' % (browsers_in_data))
ComputeExtensionsList(extensions_list, data)
browsers_processed += browsers_in_data
if 'nextPageToken' not in data or not data['nextPageToken']:
break
print('%s browsers processed.' % (browsers_processed))
if (args.max_browsers_to_process is not None and
args.max_browsers_to_process <= browsers_processed):
print('Stopping at %s browsers processed.' % (browsers_processed))
break
request_parameters = ('pageToken={}').format(data['nextPageToken'])
finally:
print('Analyze results ...')
ExtensionListAsCsv(extensions_list, args.extension_list_csv)
print("Results written to '%s'" % (args.extension_list_csv))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CBCM Extension Analyzer')
parser.add_argument(
'-k',
'--service_account_key_path',
metavar='FILENAME',
required=True,
help='The service account key file used to make API requests.')
parser.add_argument(
'-a',
'--admin_email',
required=True,
help='The admin user used to make the API requests.')
parser.add_argument(
'-x',
'--extension_list_csv',
metavar='FILENAME',
default='./extension_list.csv',
help='Generate an extension list to the specified CSV '
'file')
parser.add_argument(
'-m',
'--max_browsers_to_process',
type=int,
help='Maximum number of browsers to process. (Must be > 0).')
args = parser.parse_args()
if (args.max_browsers_to_process is not None and
args.max_browsers_to_process <= 0):
print('max_browsers_to_process must be > 0.')
parser.print_help()
sys.exit(1)
main(args)
#!/usr/bin/env python
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Transform CBCM Takeout API Data (Python2)."""
from __future__ import print_function
import argparse
import csv
import json
import sys
import google_auth_httplib2
from httplib2 import Http
from google.oauth2.service_account import Credentials
def ComputeExtensionsList(extensions_list, data):
"""Computes list of machines that have an extension.
This sample function processes the |data| retrieved from the Takeout API and
calculates the list of machines that have installed each extension listed in
the data.
Args:
extensions_list: the extension list dictionary to fill.
data: the data fetched from the Takeout API.
"""
for device in data['browsers']:
if 'browsers' not in device:
continue
for browser in device['browsers']:
if 'profiles' not in browser:
continue
for profile in browser['profiles']:
if 'extensions' not in profile:
continue
for extension in profile['extensions']:
key = extension['extensionId']
if 'version' in extension:
key = key + ' @ ' + extension['version']
if key not in extensions_list:
current_extension = {
'name':
extension['name'],
'permissions':
extension['permissions']
if 'permissions' in extension else '',
'installed':
set(),
'disabled':
set(),
'forced':
set()
}
else:
current_extension = extensions_list[key]
machine_name = device['machineName']
current_extension['installed'].add(machine_name)
if 'installType' in extension and extension['installType'] == 3:
current_extension['forced'].add(machine_name)
if 'disabled' in extension and extension['disabled']:
current_extension['disabled'].add(machine_name)
extensions_list[key] = current_extension
def DictToList(data, key_name='id'):
"""Converts a dict into a list.
The value of each member of |data| must also be a dict. The original key for
the value will be inlined into the value, under the |key_name| key.
Args:
data: a dict where every value is a dict
key_name: the name given to the key that is inlined into the dict's values
Yields:
The values from |data|, with each value's key inlined into the value.
"""
assert isinstance(data, dict), '|data| must be a dict'
for key, value in data.items():
assert isinstance(value, dict), '|value| must contain dict items'
value[key_name] = key
yield value
def Flatten(data):
"""Flattens lists inside |data|, one level deep.
This function will flatten each dictionary key in |data| into a single row
so that it can be written to a CSV file.
Args:
data: the data to be flattened.
Yields:
A list of dict objects whose lists or sets have been flattened.
"""
for item in data:
counts = {}
for prop, value in item.items():
if isinstance(value, (list, set)):
item[prop] = ', '.join(sorted(value))
counts['num_' + prop] = len(value)
assert isinstance(
item[prop],
(int, bool, str, unicode)), ('unexpected type for item: %s' %
type(item[prop]).__name__)
item.update(counts)
yield item
def ExtensionListAsCsv(extensions_list, csv_filename, sort_column='name'):
"""Saves an extensions list to a CSV file.
Args:
extensions_list: an extensions list as returned by ComputeExtensionsList
csv_filename: the name of the CSV file to save
sort_column: the name of the column by which to sort the data
"""
flattened_list = list(Flatten(DictToList(extensions_list)))
fieldnames = flattened_list[0].keys() if flattened_list else []
desired_column_order = [
'id', 'name', 'num_permissions', 'num_installed', 'num_disabled',
'num_forced', 'permissions', 'installed', 'disabled', 'forced'
]
# Order the columns as desired. Columns other than those in
# |desired_column_order| will be in an unspecified order after these columns.
ordered_fieldnames = list(c for c in desired_column_order if c in fieldnames)
ordered_fieldnames.extend(
[x for x in desired_column_order if x not in ordered_fieldnames])
with open(csv_filename, mode='w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=ordered_fieldnames)
writer.writeheader()
for row in sorted(flattened_list, key=lambda ext: ext[sort_column]):
writer.writerow(row)
def main(args):
# Load the json format key that you downloaded from the Google API
# Console when you created your service account. For p12 keys, use the
# from_p12_keyfile method of ServiceAccountCredentials and specify the
# service account email address, p12 keyfile, and scopes.
service_credentials = Credentials.from_service_account_file(
args.service_account_key_path,
scopes=[
'https://www.googleapis.com/auth/admin.directory.device.chromebrowsers'
],
subject=args.admin_email)
try:
http = google_auth_httplib2.AuthorizedHttp(service_credentials, http=Http())
extensions_list = {}
base_request_url = 'https://admin.googleapis.com/admin/directory/v1.1beta1/customer/my_customer/devices/chromebrowsers'
request_parameters = ''
browsers_processed = 0
while True:
print('Making request to server ...')
data = json.loads(
http.request(base_request_url + '?' + request_parameters, 'GET')[1])
browsers_in_data = len(data['browsers'])
print('Request returned %s results, analyzing ...' % (browsers_in_data))
ComputeExtensionsList(extensions_list, data)
browsers_processed += browsers_in_data
if 'nextPageToken' not in data or not data['nextPageToken']:
break
print('%s browsers processed.' % (browsers_processed))
if (args.max_browsers_to_process is not None and
args.max_browsers_to_process <= browsers_processed):
print('Stopping at %s browsers processed.' % (browsers_processed))
break
request_parameters = ('pageToken={}').format(data['nextPageToken'])
finally:
print('Analyze results ...')
ExtensionListAsCsv(extensions_list, args.extension_list_csv)
print("Results written to '%s'" % (args.extension_list_csv))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CBCM Extension Analyzer')
parser.add_argument(
'-k',
'--service_account_key_path',
metavar='FILENAME',
required=True,
help='The service account key file used to make API requests.')
parser.add_argument(
'-a',
'--admin_email',
required=True,
help='The admin user used to make the API requests.')
parser.add_argument(
'-x',
'--extension_list_csv',
metavar='FILENAME',
default='./extension_list.csv',
help='Generate an extension list to the specified CSV '
'file')
parser.add_argument(
'-m',
'--max_browsers_to_process',
type=int,
help='Maximum number of browsers to process. (Must be > 0).')
args = parser.parse_args()
if (args.max_browsers_to_process is not None and
args.max_browsers_to_process <= 0):
print('max_browsers_to_process must be > 0.')
parser.print_help()
sys.exit(1)
main(args)
# Python script to compute an extension list using Takeout API
## Setup Access to API
Before using CBCM Takeout API you will need to do some initial setup:
1. Enable the Admin SDK API (if not already enabled) in the Google Developer
Console by following this
[link](https://console.developers.google.com/apis/api/admin.googleapis.com/overview?project=_)
and selecting the project on which you wish to enable the API.
1. Create a service account and have the necessary service account keys for
this service account. You can follow
[this](https://developers.google.com/admin-sdk/directory/v1/guides/delegation)
for creating a service account and getting the service account keys. You
must grant the service account the "Service Account User" role on the
permissions during creation.
1. The client ID for this service account will need to be authorized for the
OAuth scopes listed below in the Admin Console under Security -> Advanced
settings -> Manage API client access. On this page, the Client ID
corresponds to the Unique ID of your service account. You will need to
authorize the client ID for the scope:
* https://www.googleapis.com/auth/admin.directory.device.chromebrowsers.readonly
## Running the Script
You can download the scripts
[here](https://chromium.googlesource.com/chromium/src/+/refs/heads/master/docs/enterprise/extension_query.py)
or here [here](https://chromium.googlesource.com/chromium/src/+/refs/heads/master/docs/enterprise/extension_query_py2.py)
for a python 2.7 compatible version.
With the service account keys, you can now run the script which uses Google API
Client Libraries to make the necessary queries to the API. The script requires
that both the service account keys you downloaded from the developers console as
well as the e-mail of an admin user in your domain that is allowed to access the
data you wish to query.
You can run the script from the command-line: python extension_query.py
--service_account_key_path <service_account_key_file> --admin_email
<admin_email> (also: `python extension_query.py --help` for a reminder of the
argument names)
Example Data output Here is an example of what that data will look like:
![Sample Results](https://chromium.googlesource.com/chromium/src/+/refs/heads/master/docs/enterprise/extension_query_sample.png)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment