Commit 91ce1b14 authored by Juan Antonio Navarro Perez's avatar Juan Antonio Navarro Perez Committed by Commit Bot

[pinboard] Fetch state from cloud storage if not found locally

This makes it easier to transfer the state from one workstation to
another, without having to fetch again the results of all past
pinpoint jobs.

Bug: 1029039
Change-Id: I83eebe5d65cdda80b2f9255a584dd86ecd40f7bc
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1942047Reviewed-by: default avatarSami Kyöstilä <skyostil@chromium.org>
Reviewed-by: default avatarRoss McIlroy <rmcilroy@chromium.org>
Commit-Queue: Juan Antonio Navarro Pérez <perezju@chromium.org>
Cr-Commit-Position: refs/heads/master@{#719987}
parent 9e24d318
...@@ -142,9 +142,10 @@ def CollectPinpointResults(state): ...@@ -142,9 +142,10 @@ def CollectPinpointResults(state):
def LoadJobsState(): def LoadJobsState():
"""Load the latest recorded state of pinpoint jobs.""" """Load the latest recorded state of pinpoint jobs."""
local_path = CachedFilePath(JOBS_STATE_FILE) local_path = CachedFilePath(JOBS_STATE_FILE)
if os.path.exists(local_path): if os.path.exists(local_path) or DownloadFromCloudStorage(local_path):
return LoadJsonFile(local_path) return LoadJsonFile(local_path)
else: else:
logging.info('No jobs state found. Creating empty state.')
return [] return []
...@@ -164,26 +165,52 @@ def UpdateJobsState(state): ...@@ -164,26 +165,52 @@ def UpdateJobsState(state):
UploadToCloudStorage(local_path) UploadToCloudStorage(local_path)
def AggregateAndUploadResults(state): def GetCachedDataset():
"""Aggregate results collected and upload them to cloud storage.""" """Load the latest dataset with cached data."""
cached_results = CachedFilePath(DATASET_PKL_FILE) local_path = CachedFilePath(DATASET_PKL_FILE)
dfs = [] if os.path.exists(local_path) or DownloadFromCloudStorage(local_path):
return pd.read_pickle(local_path)
else:
return None
def UpdateCachedDataset(df):
"""Write back the dataset with cached data."""
local_path = CachedFilePath(DATASET_PKL_FILE)
df.to_pickle(local_path)
UploadToCloudStorage(local_path)
keep_revisions = set(item['revision'] for item in state)
if os.path.exists(cached_results): def GetItemsToUpdate(state):
# To speed things up, we take the cache computed from previous results. """Select jobs with new data to download and cached data for existing jobs.
df = pd.read_pickle(cached_results)
# Drop possible old data from revisions no longer in recent state. This also filters out old revisions to keep only recent (6 months) data.
df = df[df['revision'].isin(keep_revisions)]
dfs.append(df) Returns:
new_items: A list of job items from which to get data.
cached_df: A DataFrame with existing cached data, may be None.
"""
from_date = str(TimeAgo(months=6).date())
new_items = [item for item in state if item['timestamp'] > from_date]
df = GetCachedDataset()
if df is not None:
recent_revisions = set(item['revision'] for item in new_items)
df = df[df['revision'].isin(recent_revisions)]
known_revisions = set(df['revision']) known_revisions = set(df['revision'])
else: new_items = [
known_revisions = set() item for item in new_items if item['revision'] not in known_revisions]
return new_items, df
def AggregateAndUploadResults(new_items, cached_df=None):
"""Aggregate results collected and upload them to cloud storage."""
dfs = []
if cached_df is not None:
dfs.append(cached_df)
found_new = False found_new = False
for item in state: for item in new_items:
if item['revision'] in known_revisions or _SkipProcessing(item): if _SkipProcessing(item): # Jobs are not ready, or all have failed.
# Revision is already in cache, jobs are not ready, or all have failed.
continue continue
if not found_new: if not found_new:
logging.info('Processing data from new results:') logging.info('Processing data from new results:')
...@@ -197,7 +224,7 @@ def AggregateAndUploadResults(state): ...@@ -197,7 +224,7 @@ def AggregateAndUploadResults(state):
# Otherwise update our cache and upload. # Otherwise update our cache and upload.
df = pd.concat(dfs, ignore_index=True) df = pd.concat(dfs, ignore_index=True)
df.to_pickle(cached_results) UpdateCachedDataset(df)
# Drop revisions with no results and mark the last result for each metric, # Drop revisions with no results and mark the last result for each metric,
# both with/without patch, as a 'reference'. This allows making score cards # both with/without patch, as a 'reference'. This allows making score cards
...@@ -327,6 +354,18 @@ def UploadToCloudStorage(filepath): ...@@ -327,6 +354,18 @@ def UploadToCloudStorage(filepath):
filepath, posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath))) filepath, posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)))
def DownloadFromCloudStorage(filepath):
"""Get the given file from cloud storage."""
try:
gsutil.Copy(
posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)), filepath)
logging.info('Downloaded copy of %s from cloud storage.', filepath)
return True
except subprocess.CalledProcessError:
logging.info('Failed to download copy of %s from cloud storage.', filepath)
return False
def LoadJsonFile(filename): def LoadJsonFile(filename):
with open(filename) as f: with open(filename) as f:
return json.load(f) return json.load(f)
...@@ -353,12 +392,6 @@ def SetUpLogging(level): ...@@ -353,12 +392,6 @@ def SetUpLogging(level):
logger.addHandler(h2) logger.addHandler(h2)
def SelectRecentRevisions(state):
"""Filter out old revisions from state to keep only recent (6 months) data."""
from_date = str(TimeAgo(months=6).date())
return [item for item in state if item['timestamp'] > from_date]
def Main(): def Main():
SetUpLogging(level=logging.INFO) SetUpLogging(level=logging.INFO)
actions = ('start', 'collect', 'upload') actions = ('start', 'collect', 'upload')
...@@ -377,15 +410,19 @@ def Main(): ...@@ -377,15 +410,19 @@ def Main():
logging.info('=== auto run for %s ===', args.date) logging.info('=== auto run for %s ===', args.date)
args.actions = actions args.actions = actions
cached_results_dir = CachedFilePath('job_results')
if not os.path.isdir(cached_results_dir):
os.makedirs(cached_results_dir)
state = LoadJobsState() state = LoadJobsState()
try: try:
if 'start' in args.actions: if 'start' in args.actions:
StartPinpointJobs(state, args.date) StartPinpointJobs(state, args.date)
recent_state = SelectRecentRevisions(state) new_items, cached_df = GetItemsToUpdate(state)
if 'collect' in args.actions: if 'collect' in args.actions:
CollectPinpointResults(recent_state) CollectPinpointResults(new_items)
finally: finally:
UpdateJobsState(state) UpdateJobsState(state)
if 'upload' in args.actions: if 'upload' in args.actions:
AggregateAndUploadResults(recent_state) AggregateAndUploadResults(new_items, cached_df)
...@@ -34,6 +34,9 @@ class PinboardToolTests(unittest.TestCase): ...@@ -34,6 +34,9 @@ class PinboardToolTests(unittest.TestCase):
'cli_tools.pinboard.pinboard.subprocess').start() 'cli_tools.pinboard.pinboard.subprocess').start()
self.upload_to_cloud = mock.patch( self.upload_to_cloud = mock.patch(
'cli_tools.pinboard.pinboard.UploadToCloudStorage').start() 'cli_tools.pinboard.pinboard.UploadToCloudStorage').start()
self.download_from_cloud = mock.patch(
'cli_tools.pinboard.pinboard.DownloadFromCloudStorage').start()
self.download_from_cloud.return_value = False
def tearDown(self): def tearDown(self):
mock.patch.stopall() mock.patch.stopall()
...@@ -109,7 +112,8 @@ class PinboardToolTests(unittest.TestCase): ...@@ -109,7 +112,8 @@ class PinboardToolTests(unittest.TestCase):
self.assertEqual(self.upload_to_cloud.call_count, 2) self.assertEqual(self.upload_to_cloud.call_count, 2)
@mock.patch('cli_tools.pinboard.pinboard.GetRevisionResults') @mock.patch('cli_tools.pinboard.pinboard.GetRevisionResults')
def testAggregateAndUploadResults(self, get_revision_results): @mock.patch('cli_tools.pinboard.pinboard.TimeAgo')
def testAggregateAndUploadResults(self, time_ago, get_revision_results):
state = [ state = [
StateItem('a100', timestamp='2019-03-15', job1='completed'), StateItem('a100', timestamp='2019-03-15', job1='completed'),
StateItem('a200', timestamp='2019-03-16', job2='completed'), StateItem('a200', timestamp='2019-03-16', job2='completed'),
...@@ -129,26 +133,31 @@ class PinboardToolTests(unittest.TestCase): ...@@ -129,26 +133,31 @@ class PinboardToolTests(unittest.TestCase):
return df return df
get_revision_results.side_effect = GetFakeResults get_revision_results.side_effect = GetFakeResults
time_ago.return_value = pd.Timestamp('2018-10-20')
# Only process first few revisions. # Only process first few revisions.
pinboard.AggregateAndUploadResults(state[:3]) new_items, cached_df = pinboard.GetItemsToUpdate(state[:3])
pinboard.AggregateAndUploadResults(new_items, cached_df)
dataset_file = pinboard.CachedFilePath(pinboard.DATASET_CSV_FILE) dataset_file = pinboard.CachedFilePath(pinboard.DATASET_CSV_FILE)
df = pd.read_csv(dataset_file) df = pd.read_csv(dataset_file)
self.assertEqual(set(df['revision']), set(['a100', 'a200'])) self.assertEqual(set(df['revision']), set(['a100', 'a200']))
self.assertTrue((df[df['reference']]['revision'] == 'a200').all()) self.assertTrue((df[df['reference']]['revision'] == 'a200').all())
# Incrementally process the rest. # Incrementally process the rest.
pinboard.AggregateAndUploadResults(state) new_items, cached_df = pinboard.GetItemsToUpdate(state)
pinboard.AggregateAndUploadResults(new_items, cached_df)
dataset_file = pinboard.CachedFilePath(pinboard.DATASET_CSV_FILE) dataset_file = pinboard.CachedFilePath(pinboard.DATASET_CSV_FILE)
df = pd.read_csv(dataset_file) df = pd.read_csv(dataset_file)
self.assertEqual(set(df['revision']), set(['a100', 'a200', 'a500'])) self.assertEqual(set(df['revision']), set(['a100', 'a200', 'a500']))
self.assertTrue((df[df['reference']]['revision'] == 'a500').all()) self.assertTrue((df[df['reference']]['revision'] == 'a500').all())
# No new revisions. This should be a no-op. # No new revisions. This should be a no-op.
pinboard.AggregateAndUploadResults(state) new_items, cached_df = pinboard.GetItemsToUpdate(state)
pinboard.AggregateAndUploadResults(new_items, cached_df)
self.assertEqual(get_revision_results.call_count, 4) self.assertEqual(get_revision_results.call_count, 4)
self.assertEqual(self.upload_to_cloud.call_count, 2) # Uploads twice (the pkl and csv) on each call to aggregate results.
self.assertEqual(self.upload_to_cloud.call_count, 2 * 2)
def testGetRevisionResults_simple(self): def testGetRevisionResults_simple(self):
item = StateItem('2a66ba', timestamp='2019-03-17T23:50:16-07:00') item = StateItem('2a66ba', timestamp='2019-03-17T23:50:16-07:00')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment