Commit 2e3363ad authored by John Budorick's avatar John Budorick Committed by Commit Bot

Delete //chrome/test/ispy.

Pretty sure this has been unused for years.

Change-Id: I573e2f58dcce72bf6a6ec74ca560fd71d23388b8
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2135901Reviewed-by: default avatarTheresa  <twellington@chromium.org>
Commit-Queue: John Budorick <jbudorick@chromium.org>
Cr-Commit-Position: refs/heads/master@{#756481}
parent 30b23f86
jbudorick@chromium.org
klundberg@chromium.org
application: google.com:ispy
version: 1
runtime: python27
api_version: 1
threadsafe: True
handlers:
- url: /.*
script: server.app.application
libraries:
- name: webapp2
version: latest
- name: jinja2
version: latest
- name: PIL
version: latest
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implementation of CloudBucket using Google Cloud Storage as the backend."""
import os
import sys
# boto requires depot_tools/third_party be in the path. Use
# src/build/find_depot_tools.py to add this directory.
sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
os.pardir, os.pardir, 'build'))
import find_depot_tools
DEPOT_TOOLS_PATH = find_depot_tools.add_depot_tools_to_path()
sys.path.append(os.path.join(os.path.abspath(DEPOT_TOOLS_PATH), 'third_party'))
import boto
from ispy.common import cloud_bucket
class BotoCloudBucket(cloud_bucket.BaseCloudBucket):
"""Interfaces with GS using the boto library."""
def __init__(self, key, secret, bucket_name):
"""Initializes the bucket with a key, secret, and bucket_name.
Args:
key: the API key to access GS.
secret: the API secret to access GS.
bucket_name: the name of the bucket to connect to.
"""
uri = boto.storage_uri('', 'gs')
conn = uri.connect(key, secret)
self.bucket = conn.get_bucket(bucket_name)
def _GetKey(self, path):
key = boto.gs.key.Key(self.bucket)
key.key = path
return key
# override
def UploadFile(self, path, contents, content_type):
key = self._GetKey(path)
key.set_metadata('Content-Type', content_type)
key.set_contents_from_string(contents)
# Open permissions for the appengine account to read/write.
key.add_email_grant('FULL_CONTROL',
'ispy.google.com@appspot.gserviceaccount.com')
# override
def DownloadFile(self, path):
key = self._GetKey(path)
if key.exists():
return key.get_contents_as_string()
else:
raise cloud_bucket.FileNotFoundError
# override
def UpdateFile(self, path, contents):
key = self._GetKey(path)
if key.exists():
key.set_contents_from_string(contents)
else:
raise cloud_bucket.FileNotFoundError
# override
def RemoveFile(self, path):
key = self._GetKey(path)
key.delete()
# override
def FileExists(self, path):
key = self._GetKey(path)
return key.exists()
# override
def GetImageURL(self, path):
key = self._GetKey(path)
if key.exists():
# Corrects a bug in boto that incorrectly generates a url
# to a resource in Google Cloud Storage.
return key.generate_url(3600).replace('AWSAccessKeyId', 'GoogleAccessId')
else:
raise cloud_bucket.FileNotFoundError(path)
# override
def GetAllPaths(self, prefix):
return (key.key for key in self.bucket.get_all_keys(prefix=prefix))
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
def GetScriptToWaitForUnchangingDOM():
"""Gets Javascript that waits until the DOM is stable for 5 seconds.
Times out if the DOM is not stable within 30 seconds.
Returns:
Javascript as a string.
"""
return """
var target = document.body;
var callback = arguments[arguments.length - 1]
var timeout_id = setTimeout(function() {
callback()
}, 5000);
var observer = new MutationObserver(function(mutations) {
clearTimeout(timeout_id);
timeout_id = setTimeout(function() {
callback();
}, 5000);
}).observe(target, {attributes: true, childList: true,
characterData: true, subtree: true});
"""
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
var target = document.body;
var callback = arguments[arguments.length - 1]
var timeout_id = setTimeout(function() {
callback()
}, 5000);
var observer = new MutationObserver(function(mutations) {
clearTimeout(timeout_id);
timeout_id = setTimeout(function() {
callback();
}, 5000);
}).observe(target, {attributes: true, childList: true,
characterData: true, subtree: true});
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Abstract injector class for GS requests."""
class FileNotFoundError(Exception):
"""Thrown by a subclass of CloudBucket when a file is not found."""
pass
class BaseCloudBucket(object):
"""An abstract base class for working with GS."""
def UploadFile(self, path, contents, content_type):
"""Uploads a file to GS.
Args:
path: where in GS to upload the file.
contents: the contents of the file to be uploaded.
content_type: the MIME Content-Type of the file.
"""
raise NotImplementedError
def DownloadFile(self, path):
"""Downsloads a file from GS.
Args:
path: the location in GS to download the file from.
Returns:
String contents of the file downloaded.
Raises:
bucket_injector.NotFoundException: if the file is not found.
"""
raise NotImplementedError
def UpdateFile(self, path, contents):
"""Uploads a file to GS.
Args:
path: location of the file in GS to update.
contents: the contents of the file to be updated.
"""
raise NotImplementedError
def RemoveFile(self, path):
"""Removes a file from GS.
Args:
path: the location in GS to download the file from.
"""
raise NotImplementedError
def FileExists(self, path):
"""Checks if a file exists in GS.
Args:
path: the location in GS of the file.
Returns:
boolean representing whether the file exists in GS.
"""
raise NotImplementedError
def GetImageURL(self, path):
"""Gets a URL to an item in GS from its path.
Args:
path: the location in GS of a file.
Returns:
an url to a file in GS.
Raises:
bucket_injector.NotFoundException: if the file is not found.
"""
raise NotImplementedError
def GetAllPaths(self, prefix):
"""Gets paths to files in GS that start with a prefix.
Args:
prefix: the prefix to filter files in GS.
Returns:
a generator of paths to files in GS.
"""
raise NotImplementedError
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Constants for I-Spy."""
BUCKET = 'ispy-bucket'
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utilities for performing pixel-by-pixel image comparision."""
import itertools
import StringIO
from PIL import Image
def _AreTheSameSize(images):
"""Returns whether a set of images are the size size.
Args:
images: a list of images to compare.
Returns:
boolean.
Raises:
Exception: One image or fewer is passed in.
"""
if len(images) > 1:
return all(images[0].size == img.size for img in images[1:])
else:
raise Exception('No images passed in.')
def _GetDifferenceWithMask(image1, image2, mask=None,
masked_color=(225, 225, 225, 255),
same_color=(255, 255, 255, 255),
different_color=(210, 0, 0, 255)):
"""Returns an image representing the difference between the two images.
This function computes the difference between two images taking into
account a mask if it is provided. The final three arguments represent
the coloration of the generated image.
Args:
image1: the first image to compare.
image2: the second image to compare.
mask: an optional mask image consisting of only black and white pixels
where white pixels indicate the portion of the image to be masked out.
masked_color: the color of a masked section in the resulting image.
same_color: the color of an unmasked section that is the same.
between images 1 and 2 in the resulting image.
different_color: the color of an unmasked section that is different
between images 1 and 2 in the resulting image.
Returns:
A 2-tuple with an image representing the unmasked difference between the
two input images and the number of different pixels.
Raises:
Exception: if image1, image2, and mask are not the same size.
"""
image_mask = mask
if not mask:
image_mask = Image.new('RGBA', image1.size, (0, 0, 0, 255))
if not _AreTheSameSize([image1, image2, image_mask]):
raise Exception('images and mask must be the same size.')
image_diff = Image.new('RGBA', image1.size, (0, 0, 0, 255))
data = []
diff_pixels = 0
for m, px1, px2 in itertools.izip(image_mask.getdata(),
image1.getdata(),
image2.getdata()):
if m == (255, 255, 255, 255):
data.append(masked_color)
elif px1 == px2:
data.append(same_color)
else:
data.append(different_color)
diff_pixels += 1
image_diff.putdata(data)
return (image_diff, diff_pixels)
def CreateMask(images):
"""Computes a mask for a set of images.
Returns a difference mask that is computed from the images
which are passed in. The mask will have a white pixel
anywhere that the input images differ and a black pixel
everywhere else.
Args:
images: list of images to compute the mask from.
Returns:
an image of only black and white pixels where white pixels represent
areas in the input images that have differences.
Raises:
Exception: if the images passed in are not of the same size.
Exception: if fewer than one image is passed in.
"""
if not images:
raise Exception('mask must be created from one or more images.')
mask = Image.new('RGBA', images[0].size, (0, 0, 0, 255))
image = images[0]
for other_image in images[1:]:
mask = _GetDifferenceWithMask(
image,
other_image,
mask,
masked_color=(255, 255, 255, 255),
same_color=(0, 0, 0, 255),
different_color=(255, 255, 255, 255))[0]
return mask
def AddMasks(masks):
"""Combines a list of mask images into one mask image.
Args:
masks: a list of mask-images.
Returns:
a new mask that represents the sum of the masked
regions of the passed in list of mask-images.
Raises:
Exception: if masks is an empty list, or if masks are not the same size.
"""
if not masks:
raise Exception('masks must be a list containing at least one image.')
if len(masks) > 1 and not _AreTheSameSize(masks):
raise Exception('masks in list must be of the same size.')
white = (255, 255, 255, 255)
black = (0, 0, 0, 255)
masks_data = [mask.getdata() for mask in masks]
image = Image.new('RGBA', masks[0].size, black)
image.putdata([white if white in px_set else black
for px_set in itertools.izip(*masks_data)])
return image
def ConvertDiffToMask(diff):
"""Converts a Diff image into a Mask image.
Args:
diff: the diff image to convert.
Returns:
a new mask image where everything that was masked or different in the diff
is now masked.
"""
white = (255, 255, 255, 255)
black = (0, 0, 0, 255)
diff_data = diff.getdata()
image = Image.new('RGBA', diff.size, black)
image.putdata([black if px == white else white for px in diff_data])
return image
def VisualizeImageDifferences(image1, image2, mask=None):
"""Returns an image repesenting the unmasked differences between two images.
Iterates through the pixel values of two images and an optional
mask. If the pixel values are the same, or the pixel is masked,
(0,0,0) is stored for that pixel. Otherwise, (255,255,255) is stored.
This ultimately produces an image where unmasked differences between
the two images are white pixels, and everything else is black.
Args:
image1: an RGB image
image2: another RGB image of the same size as image1.
mask: an optional RGB image consisting of only white and black pixels
where the white pixels represent the parts of the images to be masked
out.
Returns:
A 2-tuple with an image representing the unmasked difference between the
two input images and the number of different pixels.
Raises:
Exception: if the two images and optional mask are different sizes.
"""
return _GetDifferenceWithMask(image1, image2, mask)
def InflateMask(image, passes):
"""A function that adds layers of pixels around the white edges of a mask.
This function evaluates a 'frontier' of valid pixels indices. Initially,
this frontier contains all indices in the image. However, with each pass
only the pixels' indices which were added to the mask by inflation
are added to the next pass's frontier. This gives the algorithm a
large upfront cost that scales negligably when the number of passes
is increased.
Args:
image: the RGBA PIL.Image mask to inflate.
passes: the number of passes to inflate the image by.
Returns:
A RGBA PIL.Image.
"""
inflated = Image.new('RGBA', image.size)
new_dataset = list(image.getdata())
old_dataset = list(image.getdata())
frontier = set(range(len(old_dataset)))
new_frontier = set()
l = [-1, 1]
def _ShadeHorizontal(index, px):
col = index % image.size[0]
if px == (255, 255, 255, 255):
for x in l:
if 0 <= col + x < image.size[0]:
if old_dataset[index + x] != (255, 255, 255, 255):
new_frontier.add(index + x)
new_dataset[index + x] = (255, 255, 255, 255)
def _ShadeVertical(index, px):
row = index / image.size[0]
if px == (255, 255, 255, 255):
for x in l:
if 0 <= row + x < image.size[1]:
if old_dataset[index + image.size[0] * x] != (255, 255, 255, 255):
new_frontier.add(index + image.size[0] * x)
new_dataset[index + image.size[0] * x] = (255, 255, 255, 255)
for _ in range(passes):
for index in frontier:
_ShadeHorizontal(index, old_dataset[index])
_ShadeVertical(index, old_dataset[index])
old_dataset, new_dataset = new_dataset, new_dataset
frontier, new_frontier = new_frontier, set()
inflated.putdata(new_dataset)
return inflated
def TotalDifferentPixels(image1, image2, mask=None):
"""Computes the number of different pixels between two images.
Args:
image1: the first RGB image to be compared.
image2: the second RGB image to be compared.
mask: an optional RGB image of only black and white pixels
where white pixels indicate the parts of the image to be masked out.
Returns:
the number of differing pixels between the images.
Raises:
Exception: if the images to be compared and the mask are not the same size.
"""
image_mask = mask
if not mask:
image_mask = Image.new('RGBA', image1.size, (0, 0, 0, 255))
if _AreTheSameSize([image1, image2, image_mask]):
total_diff = 0
for px1, px2, m in itertools.izip(image1.getdata(),
image2.getdata(),
image_mask.getdata()):
if m == (255, 255, 255, 255):
continue
elif px1 != px2:
total_diff += 1
else:
continue
return total_diff
else:
raise Exception('images and mask must be the same size')
def SameImage(image1, image2, mask=None):
"""Returns a boolean representing whether the images are the same.
Returns a boolean indicating whether two images are similar
enough to be considered the same. Essentially wraps the
TotalDifferentPixels function.
Args:
image1: an RGB image to compare.
image2: an RGB image to compare.
mask: an optional image of only black and white pixels
where white pixels are masked out
Returns:
True if the images are similar, False otherwise.
Raises:
Exception: if the images (and mask) are different sizes.
"""
different_pixels = TotalDifferentPixels(image1, image2, mask)
return different_pixels == 0
def EncodePNG(image):
"""Returns the PNG file-contents of the image.
Args:
image: an RGB image to be encoded.
Returns:
a base64 encoded string representing the image.
"""
f = StringIO.StringIO()
image.save(f, 'PNG')
encoded_image = f.getvalue()
f.close()
return encoded_image
def DecodePNG(png):
"""Returns a RGB image from PNG file-contents.
Args:
encoded_image: PNG file-contents of an RGB image.
Returns:
an RGB image
"""
return Image.open(StringIO.StringIO(png))
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
import sys
import os
from PIL import Image
import image_tools
def _GenImage(size, color):
return Image.new('RGBA', size, color)
def _AllPixelsOfColor(image, color):
return not any(px != color for px in image.getdata())
class ImageToolsTest(unittest.TestCase):
def setUp(self):
self.black25 = _GenImage((25, 25), (0, 0, 0, 255))
self.black50 = _GenImage((50, 50), (0, 0, 0, 255))
self.white25 = _GenImage((25, 25), (255, 255, 255, 255))
self.white50 = _GenImage((50, 50), (255, 255, 255, 255))
def testAreTheSameSize(self):
self.assertTrue(image_tools._AreTheSameSize([self.black25, self.black25]))
self.assertTrue(image_tools._AreTheSameSize([self.white25, self.white25]))
self.assertTrue(image_tools._AreTheSameSize([self.black50, self.black50]))
self.assertTrue(image_tools._AreTheSameSize([self.white50, self.white50]))
self.assertTrue(image_tools._AreTheSameSize([self.black25, self.white25]))
self.assertTrue(image_tools._AreTheSameSize([self.black50, self.white50]))
self.assertFalse(image_tools._AreTheSameSize([self.black50, self.black25]))
self.assertFalse(image_tools._AreTheSameSize([self.white50, self.white25]))
self.assertFalse(image_tools._AreTheSameSize([self.black25, self.white50]))
self.assertFalse(image_tools._AreTheSameSize([self.black50, self.white25]))
self.assertRaises(Exception, image_tools._AreTheSameSize, [])
self.assertRaises(Exception, image_tools._AreTheSameSize, [self.black50])
def testGetDifferenceWithMask(self):
self.assertTrue(_AllPixelsOfColor(image_tools._GetDifferenceWithMask(
self.black25, self.black25)[0], (255, 255, 255, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools._GetDifferenceWithMask(
self.white25, self.black25)[0], (210, 0, 0, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools._GetDifferenceWithMask(
self.black25, self.black25, mask=self.black25)[0],
(255, 255, 255, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools._GetDifferenceWithMask(
self.black25, self.black25, mask=self.white25)[0],
(225, 225, 225, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools._GetDifferenceWithMask(
self.black25, self.white25, mask=self.black25)[0],
(210, 0, 0, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools._GetDifferenceWithMask(
self.black25, self.white25, mask=self.white25)[0],
(225, 225, 225, 255)))
self.assertRaises(Exception, image_tools._GetDifferenceWithMask,
self.white25,
self.black50)
self.assertRaises(Exception, image_tools._GetDifferenceWithMask,
self.white25,
self.white25,
mask=self.black50)
def testCreateMask(self):
m1 = image_tools.CreateMask([self.black25, self.white25])
self.assertTrue(_AllPixelsOfColor(m1, (255, 255, 255, 255)))
m2 = image_tools.CreateMask([self.black25, self.black25])
self.assertTrue(_AllPixelsOfColor(m2, (0, 0, 0, 255)))
m3 = image_tools.CreateMask([self.white25, self.white25])
self.assertTrue(_AllPixelsOfColor(m3, (0, 0, 0, 255)))
def testAddMasks(self):
m1 = image_tools.CreateMask([self.black25, self.white25])
m2 = image_tools.CreateMask([self.black25, self.black25])
m3 = image_tools.CreateMask([self.black50, self.black50])
self.assertTrue(_AllPixelsOfColor(image_tools.AddMasks([m1]),
(255, 255, 255, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools.AddMasks([m2]),
(0, 0, 0, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools.AddMasks([m1, m2]),
(255, 255, 255, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools.AddMasks([m1, m1]),
(255, 255, 255, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools.AddMasks([m2, m2]),
(0, 0, 0, 255)))
self.assertTrue(_AllPixelsOfColor(image_tools.AddMasks([m3]),
(0, 0, 0, 255)))
self.assertRaises(Exception, image_tools.AddMasks, [])
self.assertRaises(Exception, image_tools.AddMasks, [m1, m3])
def testTotalDifferentPixels(self):
self.assertEquals(image_tools.TotalDifferentPixels(self.white25,
self.white25),
0)
self.assertEquals(image_tools.TotalDifferentPixels(self.black25,
self.black25),
0)
self.assertEquals(image_tools.TotalDifferentPixels(self.white25,
self.black25),
25*25)
self.assertEquals(image_tools.TotalDifferentPixels(self.white25,
self.black25,
mask=self.white25),
0)
self.assertEquals(image_tools.TotalDifferentPixels(self.white25,
self.white25,
mask=self.white25),
0)
self.assertEquals(image_tools.TotalDifferentPixels(self.white25,
self.black25,
mask=self.black25),
25*25)
self.assertEquals(image_tools.TotalDifferentPixels(self.white25,
self.white25,
mask=self.black25),
0)
self.assertRaises(Exception, image_tools.TotalDifferentPixels,
self.white25, self.white50)
self.assertRaises(Exception, image_tools.TotalDifferentPixels,
self.white25, self.white25, mask=self.white50)
def testSameImage(self):
self.assertTrue(image_tools.SameImage(self.white25, self.white25))
self.assertFalse(image_tools.SameImage(self.white25, self.black25))
self.assertTrue(image_tools.SameImage(self.white25, self.black25,
mask=self.white25))
self.assertFalse(image_tools.SameImage(self.white25, self.black25,
mask=self.black25))
self.assertTrue(image_tools.SameImage(self.black25, self.black25))
self.assertTrue(image_tools.SameImage(self.black25, self.black25,
mask=self.white25))
self.assertTrue(image_tools.SameImage(self.white25, self.white25,
mask=self.white25))
self.assertRaises(Exception, image_tools.SameImage,
self.white25, self.white50)
self.assertRaises(Exception, image_tools.SameImage,
self.white25, self.white25,
mask=self.white50)
def testInflateMask(self):
cross_image = Image.new('RGBA', (3, 3))
white_image = Image.new('RGBA', (3, 3))
dot_image = Image.new('RGBA', (3, 3))
b = (0, 0, 0, 255)
w = (255, 255, 255, 255)
dot_image.putdata([b, b, b,
b, w, b,
b, b, b])
cross_image.putdata([b, w, b,
w, w, w,
b, w, b])
white_image.putdata([w, w, w,
w, w, w,
w, w, w])
self.assertEquals(list(image_tools.InflateMask(dot_image, 1).getdata()),
list(cross_image.getdata()))
self.assertEquals(list(image_tools.InflateMask(dot_image, 0).getdata()),
list(dot_image.getdata()))
self.assertEquals(list(image_tools.InflateMask(dot_image, 2).getdata()),
list(white_image.getdata()))
self.assertEquals(list(image_tools.InflateMask(dot_image, 3).getdata()),
list(white_image.getdata()))
self.assertEquals(list(image_tools.InflateMask(self.black25, 1).getdata()),
list(self.black25.getdata()))
def testPNGEncodeDecode(self):
self.assertTrue(_AllPixelsOfColor(
image_tools.DecodePNG(
image_tools.EncodePNG(self.white25)), (255, 255, 255, 255)))
self.assertTrue(_AllPixelsOfColor(
image_tools.DecodePNG(
image_tools.EncodePNG(self.black25)), (0, 0, 0, 255)))
if __name__ == '__main__':
unittest.main()
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Internal utilities for managing I-Spy test results in Google Cloud Storage.
See the ispy.ispy_api module for the external API.
"""
import collections
import itertools
import json
import os
import sys
import image_tools
_INVALID_EXPECTATION_CHARS = ['/', '\\', ' ', '"', '\'']
def IsValidExpectationName(expectation_name):
return not any(c in _INVALID_EXPECTATION_CHARS for c in expectation_name)
def GetExpectationPath(expectation, file_name=''):
"""Get the path to a test file in the given test run and expectation.
Args:
expectation: name of the expectation.
file_name: name of the file.
Returns:
the path as a string relative to the bucket.
"""
return 'expectations/%s/%s' % (expectation, file_name)
def GetFailurePath(test_run, expectation, file_name=''):
"""Get the path to a failure file in the given test run and test.
Args:
test_run: name of the test run.
expectation: name of the expectation.
file_name: name of the file.
Returns:
the path as a string relative to the bucket.
"""
return GetTestRunPath(test_run, '%s/%s' % (expectation, file_name))
def GetTestRunPath(test_run, file_name=''):
"""Get the path to a the given test run.
Args:
test_run: name of the test run.
file_name: name of the file.
Returns:
the path as a string relative to the bucket.
"""
return 'failures/%s/%s' % (test_run, file_name)
class ISpyUtils(object):
"""Utility functions for working with an I-Spy google storage bucket."""
def __init__(self, cloud_bucket):
"""Initialize with a cloud bucket instance to supply GS functionality.
Args:
cloud_bucket: An object implementing the cloud_bucket.BaseCloudBucket
interface.
"""
self.cloud_bucket = cloud_bucket
def UploadImage(self, full_path, image):
"""Uploads an image to a location in GS.
Args:
full_path: the path to the file in GS including the file extension.
image: a RGB PIL.Image to be uploaded.
"""
self.cloud_bucket.UploadFile(
full_path, image_tools.EncodePNG(image), 'image/png')
def DownloadImage(self, full_path):
"""Downloads an image from a location in GS.
Args:
full_path: the path to the file in GS including the file extension.
Returns:
The downloaded RGB PIL.Image.
Raises:
cloud_bucket.NotFoundError: if the path to the image is not valid.
"""
return image_tools.DecodePNG(self.cloud_bucket.DownloadFile(full_path))
def UpdateImage(self, full_path, image):
"""Updates an existing image in GS, preserving permissions and metadata.
Args:
full_path: the path to the file in GS including the file extension.
image: a RGB PIL.Image.
"""
self.cloud_bucket.UpdateFile(full_path, image_tools.EncodePNG(image))
def GenerateExpectation(self, expectation, images):
"""Creates and uploads an expectation to GS from a set of images and name.
This method generates a mask from the uploaded images, then
uploads the mask and first of the images to GS as a expectation.
Args:
expectation: name for this expectation, any existing expectation with the
name will be replaced.
images: a list of RGB encoded PIL.Images
Raises:
ValueError: if the expectation name is invalid.
"""
if not IsValidExpectationName(expectation):
raise ValueError("Expectation name contains an illegal character: %s." %
str(_INVALID_EXPECTATION_CHARS))
mask = image_tools.InflateMask(image_tools.CreateMask(images), 7)
self.UploadImage(
GetExpectationPath(expectation, 'expected.png'), images[0])
self.UploadImage(GetExpectationPath(expectation, 'mask.png'), mask)
def PerformComparison(self, test_run, expectation, actual):
"""Runs an image comparison, and uploads discrepancies to GS.
Args:
test_run: the name of the test_run.
expectation: the name of the expectation to use for comparison.
actual: an RGB-encoded PIL.Image that is the actual result.
Raises:
cloud_bucket.NotFoundError: if the given expectation is not found.
ValueError: if the expectation name is invalid.
"""
if not IsValidExpectationName(expectation):
raise ValueError("Expectation name contains an illegal character: %s." %
str(_INVALID_EXPECTATION_CHARS))
expectation_tuple = self.GetExpectation(expectation)
if not image_tools.SameImage(
actual, expectation_tuple.expected, mask=expectation_tuple.mask):
self.UploadImage(
GetFailurePath(test_run, expectation, 'actual.png'), actual)
diff, diff_pxls = image_tools.VisualizeImageDifferences(
expectation_tuple.expected, actual, mask=expectation_tuple.mask)
self.UploadImage(GetFailurePath(test_run, expectation, 'diff.png'), diff)
self.cloud_bucket.UploadFile(
GetFailurePath(test_run, expectation, 'info.txt'),
json.dumps({
'different_pixels': diff_pxls,
'fraction_different':
diff_pxls / float(actual.size[0] * actual.size[1])}),
'application/json')
def GetExpectation(self, expectation):
"""Returns the given expectation from GS.
Args:
expectation: the name of the expectation to get.
Returns:
A named tuple: 'Expectation', containing two images: expected and mask.
Raises:
cloud_bucket.NotFoundError: if the test is not found in GS.
"""
Expectation = collections.namedtuple('Expectation', ['expected', 'mask'])
return Expectation(self.DownloadImage(GetExpectationPath(expectation,
'expected.png')),
self.DownloadImage(GetExpectationPath(expectation,
'mask.png')))
def ExpectationExists(self, expectation):
"""Returns whether the given expectation exists in GS.
Args:
expectation: the name of the expectation to check.
Returns:
A boolean indicating whether the test exists.
"""
expected_image_exists = self.cloud_bucket.FileExists(
GetExpectationPath(expectation, 'expected.png'))
mask_image_exists = self.cloud_bucket.FileExists(
GetExpectationPath(expectation, 'mask.png'))
return expected_image_exists and mask_image_exists
def FailureExists(self, test_run, expectation):
"""Returns whether a failure for the expectation exists for the given run.
Args:
test_run: the name of the test_run.
expectation: the name of the expectation that failed.
Returns:
A boolean indicating whether the failure exists.
"""
actual_image_exists = self.cloud_bucket.FileExists(
GetFailurePath(test_run, expectation, 'actual.png'))
test_exists = self.ExpectationExists(expectation)
info_exists = self.cloud_bucket.FileExists(
GetFailurePath(test_run, expectation, 'info.txt'))
return test_exists and actual_image_exists and info_exists
def RemoveExpectation(self, expectation):
"""Removes an expectation and all associated failures with that test.
Args:
expectation: the name of the expectation to remove.
"""
test_paths = self.cloud_bucket.GetAllPaths(
GetExpectationPath(expectation))
for path in test_paths:
self.cloud_bucket.RemoveFile(path)
def GenerateExpectationPinkOut(self, expectation, images, pint_out, rgb):
"""Uploads an ispy-test to GS with the pink_out workaround.
Args:
expectation: the name of the expectation to be uploaded.
images: a json encoded list of base64 encoded png images.
pink_out: an image.
RGB: a json list representing the RGB values of a color to mask out.
Raises:
ValueError: if expectation name is invalid.
"""
if not IsValidExpectationName(expectation):
raise ValueError("Expectation name contains an illegal character: %s." %
str(_INVALID_EXPECTATION_CHARS))
# convert the pink_out into a mask
black = (0, 0, 0, 255)
white = (255, 255, 255, 255)
pink_out.putdata(
[black if px == (rgb[0], rgb[1], rgb[2], 255) else white
for px in pink_out.getdata()])
mask = image_tools.CreateMask(images)
mask = image_tools.InflateMask(image_tools.CreateMask(images), 7)
combined_mask = image_tools.AddMasks([mask, pink_out])
self.UploadImage(GetExpectationPath(expectation, 'expected.png'), images[0])
self.UploadImage(GetExpectationPath(expectation, 'mask.png'), combined_mask)
def RemoveFailure(self, test_run, expectation):
"""Removes a failure from GS.
Args:
test_run: the name of the test_run.
expectation: the expectation on which the failure to be removed occured.
"""
failure_paths = self.cloud_bucket.GetAllPaths(
GetFailurePath(test_run, expectation))
for path in failure_paths:
self.cloud_bucket.RemoveFile(path)
def GetFailure(self, test_run, expectation):
"""Returns a given test failure's expected, diff, and actual images.
Args:
test_run: the name of the test_run.
expectation: the name of the expectation the result corresponds to.
Returns:
A named tuple: Failure containing three images: expected, diff, and
actual.
Raises:
cloud_bucket.NotFoundError: if the result is not found in GS.
"""
expected = self.DownloadImage(
GetExpectationPath(expectation, 'expected.png'))
actual = self.DownloadImage(
GetFailurePath(test_run, expectation, 'actual.png'))
diff = self.DownloadImage(
GetFailurePath(test_run, expectation, 'diff.png'))
info = json.loads(self.cloud_bucket.DownloadFile(
GetFailurePath(test_run, expectation, 'info.txt')))
Failure = collections.namedtuple(
'Failure', ['expected', 'diff', 'actual', 'info'])
return Failure(expected, diff, actual, info)
def GetAllPaths(self, prefix, max_keys=None, marker=None, delimiter=None):
"""Gets urls to all files in GS whose path starts with a given prefix.
Args:
prefix: the prefix to filter files in GS by.
max_keys: Integer. Specifies the maximum number of objects returned
marker: String. Only objects whose fullpath starts lexicographically
after marker (exclusively) will be returned
delimiter: String. Turns on directory mode, specifies characters
to be used as directory separators
Returns:
a list containing urls to all objects that started with
the prefix.
"""
return self.cloud_bucket.GetAllPaths(
prefix, max_keys=max_keys, marker=marker, delimiter=delimiter)
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
from PIL import Image
import sys
import unittest
import cloud_bucket
import image_tools
import ispy_utils
import mock_cloud_bucket
class ISpyUtilsUnitTest(unittest.TestCase):
def setUp(self):
# Set up structures that will be reused throughout testing.
self.bucket = mock_cloud_bucket.MockCloudBucket()
self.ispy_utils = ispy_utils.ISpyUtils(self.bucket)
self.white = Image.new('RGBA', (25, 25), (255, 255, 255, 255))
self.red = Image.new('RGBA', (25, 25), (255, 0, 0, 255))
self.black = Image.new('RGBA', (25, 25), (0, 0, 0, 255))
self.masked = Image.new('RGBA', (25, 25), (210, 0, 0, 255))
def testUploadImage(self):
self.bucket.Reset()
# Upload some images to the datastore.
self.ispy_utils.UploadImage('path/to/white.png', self.white)
self.ispy_utils.UploadImage('path/to/black.png', self.black)
self.ispy_utils.UploadImage('path/to/red.png', self.red)
# Confirm that the images actually got uploaded.
self.assertEquals(self.bucket.datastore['path/to/white.png'],
image_tools.EncodePNG(self.white))
self.assertEquals(self.bucket.datastore['path/to/black.png'],
image_tools.EncodePNG(self.black))
self.assertEquals(self.bucket.datastore['path/to/red.png'],
image_tools.EncodePNG(self.red))
def testDownloadImage(self):
self.bucket.Reset()
# Upload some images to the datastore.
self.ispy_utils.UploadImage('path/to/white.png', self.white)
self.ispy_utils.UploadImage('path/to/black.png', self.black)
self.ispy_utils.UploadImage('path/to/red.png', self.red)
# Check that the DownloadImage function gets the correct images.
self.assertEquals(
image_tools.EncodePNG(
self.ispy_utils.DownloadImage('path/to/white.png')),
image_tools.EncodePNG(self.white))
self.assertEquals(
image_tools.EncodePNG(
self.ispy_utils.DownloadImage('path/to/black.png')),
image_tools.EncodePNG(self.black))
self.assertEquals(
image_tools.EncodePNG(
self.ispy_utils.DownloadImage('path/to/red.png')),
image_tools.EncodePNG(self.red))
# Check that the DownloadImage function throws an error for a
# nonexistant image.
self.assertRaises(cloud_bucket.FileNotFoundError,
self.ispy_utils.DownloadImage,
'path/to/yellow.png')
def testUpdateImage(self):
self.bucket.Reset()
# Upload some images to the datastore.
self.ispy_utils.UploadImage('path/to/image.png', self.white)
self.assertEquals(self.bucket.datastore['path/to/image.png'],
image_tools.EncodePNG(self.white))
self.ispy_utils.UpdateImage('path/to/image.png', self.black)
# Confirm that the image actually got updated.
self.assertEquals(self.bucket.datastore['path/to/image.png'],
image_tools.EncodePNG(self.black))
def testGenerateExpectation(self):
self.bucket.Reset()
# Upload some tests to the datastore.
self.ispy_utils.GenerateExpectation('test', [self.white, self.black])
self.ispy_utils.GenerateExpectation('test1', [self.black, self.black])
self.ispy_utils.GenerateExpectation('test2', [self.black])
# Confirm that the tests were successfully uploaded.
self.assertEquals(self.bucket.datastore[
ispy_utils.GetExpectationPath('test', 'expected.png')],
image_tools.EncodePNG(self.white))
self.assertEquals(self.bucket.datastore[
ispy_utils.GetExpectationPath('test', 'mask.png')],
image_tools.EncodePNG(self.white))
self.assertEquals(self.bucket.datastore[
ispy_utils.GetExpectationPath('test1', 'expected.png')],
image_tools.EncodePNG(self.black))
self.assertEquals(self.bucket.datastore[
ispy_utils.GetExpectationPath('test1', 'mask.png')],
image_tools.EncodePNG(self.black))
self.assertEquals(self.bucket.datastore[
ispy_utils.GetExpectationPath('test2', 'expected.png')],
image_tools.EncodePNG(self.black))
self.assertEquals(self.bucket.datastore[
ispy_utils.GetExpectationPath('test2', 'mask.png')],
image_tools.EncodePNG(self.black))
def testPerformComparison(self):
self.bucket.Reset()
self.ispy_utils.GenerateExpectation('test1', [self.red, self.red])
self.ispy_utils.PerformComparison('test', 'test1', self.black)
self.assertEquals(self.bucket.datastore[
ispy_utils.GetFailurePath('test', 'test1', 'actual.png')],
image_tools.EncodePNG(self.black))
self.ispy_utils.PerformComparison('test', 'test1', self.red)
self.assertTrue(
ispy_utils.GetFailurePath('test', 'test1', 'actual.png') in self.bucket
.datastore)
def testGetExpectation(self):
self.bucket.Reset()
# Upload some tests to the datastore
self.ispy_utils.GenerateExpectation('test1', [self.white, self.black])
self.ispy_utils.GenerateExpectation('test2', [self.red, self.white])
test1 = self.ispy_utils.GetExpectation('test1')
test2 = self.ispy_utils.GetExpectation('test2')
# Check that GetExpectation gets the appropriate tests.
self.assertEquals(image_tools.EncodePNG(test1.expected),
image_tools.EncodePNG(self.white))
self.assertEquals(image_tools.EncodePNG(test1.mask),
image_tools.EncodePNG(self.white))
self.assertEquals(image_tools.EncodePNG(test2.expected),
image_tools.EncodePNG(self.red))
self.assertEquals(image_tools.EncodePNG(test2.mask),
image_tools.EncodePNG(self.white))
# Check that GetExpectation throws an error for a nonexistant test.
self.assertRaises(
cloud_bucket.FileNotFoundError, self.ispy_utils.GetExpectation, 'test3')
def testExpectationExists(self):
self.bucket.Reset()
self.ispy_utils.GenerateExpectation('test1', [self.white, self.black])
self.ispy_utils.GenerateExpectation('test2', [self.white, self.black])
self.assertTrue(self.ispy_utils.ExpectationExists('test1'))
self.assertTrue(self.ispy_utils.ExpectationExists('test2'))
self.assertFalse(self.ispy_utils.ExpectationExists('test3'))
def testFailureExists(self):
self.bucket.Reset()
self.ispy_utils.GenerateExpectation('test1', [self.white, self.white])
self.ispy_utils.PerformComparison('test', 'test1', self.black)
self.ispy_utils.PerformComparison('test', 'test1', self.white)
self.assertTrue(self.ispy_utils.FailureExists('test', 'test1'))
self.assertFalse(self.ispy_utils.FailureExists('test', 'test2'))
def testRemoveExpectation(self):
self.bucket.Reset()
self.ispy_utils.GenerateExpectation('test1', [self.white, self.white])
self.ispy_utils.GenerateExpectation('test2', [self.white, self.white])
self.assertTrue(self.ispy_utils.ExpectationExists('test1'))
self.assertTrue(self.ispy_utils.ExpectationExists('test2'))
self.ispy_utils.RemoveExpectation('test1')
self.assertFalse(self.ispy_utils.ExpectationExists('test1'))
self.assertTrue(self.ispy_utils.ExpectationExists('test2'))
self.ispy_utils.RemoveExpectation('test2')
self.assertFalse(self.ispy_utils.ExpectationExists('test1'))
self.assertFalse(self.ispy_utils.ExpectationExists('test2'))
def testRemoveFailure(self):
self.bucket.Reset()
self.ispy_utils.GenerateExpectation('test1', [self.white, self.white])
self.ispy_utils.GenerateExpectation('test2', [self.white, self.white])
self.ispy_utils.PerformComparison('test', 'test1', self.black)
self.ispy_utils.RemoveFailure('test', 'test1')
self.assertFalse(self.ispy_utils.FailureExists('test', 'test1'))
self.assertTrue(self.ispy_utils.ExpectationExists('test1'))
self.assertFalse(self.ispy_utils.FailureExists('test', 'test2'))
self.assertTrue(self.ispy_utils.ExpectationExists('test2'))
def testGetFailure(self):
self.bucket.Reset()
# Upload a result
self.ispy_utils.GenerateExpectation('test1', [self.red, self.red])
self.ispy_utils.PerformComparison('test', 'test1', self.black)
res = self.ispy_utils.GetFailure('test', 'test1')
# Check that the function correctly got the result.
self.assertEquals(image_tools.EncodePNG(res.expected),
image_tools.EncodePNG(self.red))
self.assertEquals(image_tools.EncodePNG(res.diff),
image_tools.EncodePNG(self.masked))
self.assertEquals(image_tools.EncodePNG(res.actual),
image_tools.EncodePNG(self.black))
# Check that the function raises an error when given non-existant results.
self.assertRaises(cloud_bucket.FileNotFoundError,
self.ispy_utils.GetFailure, 'test', 'test2')
def testGetAllPaths(self):
self.bucket.Reset()
# Upload some tests.
self.ispy_utils.GenerateExpectation('test1', [self.white, self.black])
# Check that the function gets all urls matching the prefix.
self.assertEquals(
set(self.ispy_utils.GetAllPaths(
ispy_utils.GetExpectationPath('test1'))),
set([ispy_utils.GetExpectationPath('test1', 'expected.png'),
ispy_utils.GetExpectationPath('test1', 'mask.png')]))
self.assertEquals(
set(self.ispy_utils.GetAllPaths(
ispy_utils.GetExpectationPath('test3'))), set())
if __name__ == '__main__':
unittest.main()
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Subclass of CloudBucket used for testing."""
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
import cloud_bucket
class MockCloudBucket(cloud_bucket.BaseCloudBucket):
"""Subclass of CloudBucket used for testing."""
def __init__(self):
"""Initializes the MockCloudBucket with its datastore.
Returns:
An instance of MockCloudBucket.
"""
self.datastore = {}
def Reset(self):
"""Clears the MockCloudBucket's datastore."""
self.datastore = {}
# override
def UploadFile(self, path, contents, content_type):
self.datastore[path] = contents
# override
def DownloadFile(self, path):
if path in self.datastore:
return self.datastore[path]
else:
raise cloud_bucket.FileNotFoundError
# override
def UpdateFile(self, path, contents):
if not self.FileExists(path):
raise cloud_bucket.FileNotFoundError
self.UploadFile(path, contents, '')
# override
def RemoveFile(self, path):
if path in self.datastore:
self.datastore.pop(path)
# override
def FileExists(self, path):
return path in self.datastore
# override
def GetImageURL(self, path):
if path in self.datastore:
return path
else:
raise cloud_bucket.FileNotFoundError
# override
def GetAllPaths(self, prefix):
return (item[0] for item in self.datastore.items()
if item[0].startswith(prefix))
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
from distutils.version import LooseVersion
from PIL import Image
from common import cloud_bucket
from common import ispy_utils
class ISpyApi(object):
"""The public API for interacting with ISpy."""
def __init__(self, cloud_bucket):
"""Initializes the utility class.
Args:
cloud_bucket: a BaseCloudBucket in which to the version file,
expectations and results are to be stored.
"""
self._cloud_bucket = cloud_bucket
self._ispy = ispy_utils.ISpyUtils(self._cloud_bucket)
self._rebaselineable_cache = {}
def UpdateExpectationVersion(self, chrome_version, version_file):
"""Updates the most recent expectation version to the Chrome version.
Should be called after generating a new set of expectations.
Args:
chrome_version: the chrome version as a string of the form "31.0.123.4".
version_file: path to the version file in the cloud bucket. The version
file contains a json list of ordered Chrome versions for which
expectations exist.
"""
insert_pos = 0
expectation_versions = []
try:
expectation_versions = self._GetExpectationVersionList(version_file)
if expectation_versions:
try:
version = self._GetExpectationVersion(
chrome_version, expectation_versions)
if version == chrome_version:
return
insert_pos = expectation_versions.index(version)
except:
insert_pos = len(expectation_versions)
except cloud_bucket.FileNotFoundError:
pass
expectation_versions.insert(insert_pos, chrome_version)
logging.info('Updating expectation version...')
self._cloud_bucket.UploadFile(
version_file, json.dumps(expectation_versions),
'application/json')
def _GetExpectationVersion(self, chrome_version, expectation_versions):
"""Returns the expectation version for the given Chrome version.
Args:
chrome_version: the chrome version as a string of the form "31.0.123.4".
expectation_versions: Ordered list of Chrome versions for which
expectations exist, as stored in the version file.
Returns:
Expectation version string.
"""
# Find the closest version that is not greater than the chrome version.
for version in expectation_versions:
if LooseVersion(version) <= LooseVersion(chrome_version):
return version
raise Exception('No expectation exists for Chrome %s' % chrome_version)
def _GetExpectationVersionList(self, version_file):
"""Gets the list of expectation versions from google storage.
Args:
version_file: path to the version file in the cloud bucket. The version
file contains a json list of ordered Chrome versions for which
expectations exist.
Returns:
Ordered list of Chrome versions.
"""
try:
return json.loads(self._cloud_bucket.DownloadFile(version_file))
except:
return []
def _GetExpectationNameWithVersion(self, device_type, expectation,
chrome_version, version_file):
"""Get the expectation to be used with the current Chrome version.
Args:
device_type: string identifier for the device type.
expectation: name for the expectation to generate.
chrome_version: the chrome version as a string of the form "31.0.123.4".
Returns:
Version as an integer.
"""
version = self._GetExpectationVersion(
chrome_version, self._GetExpectationVersionList(version_file))
return self._CreateExpectationName(device_type, expectation, version)
def _CreateExpectationName(self, device_type, expectation, version):
"""Create the full expectation name from the expectation and version.
Args:
device_type: string identifier for the device type, example: mako
expectation: base name for the expectation, example: google.com
version: expectation version, example: 31.0.23.1
Returns:
Full expectation name as a string, example: mako:google.com(31.0.23.1)
"""
return '%s:%s(%s)' % (device_type, expectation, version)
def GenerateExpectation(self, device_type, expectation, chrome_version,
version_file, screenshots):
"""Create an expectation for I-Spy.
Args:
device_type: string identifier for the device type.
expectation: name for the expectation to generate.
chrome_version: the chrome version as a string of the form "31.0.123.4".
screenshots: a list of similar PIL.Images.
"""
# https://code.google.com/p/chromedriver/issues/detail?id=463
expectation_with_version = self._CreateExpectationName(
device_type, expectation, chrome_version)
if self._ispy.ExpectationExists(expectation_with_version):
logging.warning(
'I-Spy expectation \'%s\' already exists, overwriting.',
expectation_with_version)
logging.info('Generating I-Spy expectation...')
self._ispy.GenerateExpectation(expectation_with_version, screenshots)
def PerformComparison(self, test_run, device_type, expectation,
chrome_version, version_file, screenshot):
"""Compare a screenshot with the given expectation in I-Spy.
Args:
test_run: name for the test run.
device_type: string identifier for the device type.
expectation: name for the expectation to compare against.
chrome_version: the chrome version as a string of the form "31.0.123.4".
screenshot: a PIL.Image to compare.
"""
# https://code.google.com/p/chromedriver/issues/detail?id=463
logging.info('Performing I-Spy comparison...')
self._ispy.PerformComparison(
test_run,
self._GetExpectationNameWithVersion(
device_type, expectation, chrome_version, version_file),
screenshot)
def CanRebaselineToTestRun(self, test_run):
"""Returns whether the test run has associated expectations.
Returns:
True if RebaselineToTestRun() can be called for this test run.
"""
if test_run in self._rebaselineable_cache:
return True
return self._cloud_bucket.FileExists(
ispy_utils.GetTestRunPath(test_run, 'rebaseline.txt'))
def RebaselineToTestRun(self, test_run):
"""Update the version file to use expectations associated with |test_run|.
Args:
test_run: The name of the test run to rebaseline.
"""
rebaseline_path = ispy_utils.GetTestRunPath(test_run, 'rebaseline.txt')
rebaseline_attrib = json.loads(
self._cloud_bucket.DownloadFile(rebaseline_path))
self.UpdateExpectationVersion(
rebaseline_attrib['version'], rebaseline_attrib['version_file'])
self._cloud_bucket.RemoveFile(rebaseline_path)
def _SetTestRunRebaselineable(self, test_run, chrome_version, version_file):
"""Writes a JSON file containing the data needed to rebaseline.
Args:
test_run: The name of the test run to add the rebaseline file to.
chrome_version: the chrome version that can be rebaselined to (must have
associated Expectations).
version_file: the path of the version file associated with the test run.
"""
self._rebaselineable_cache[test_run] = True
self._cloud_bucket.UploadFile(
ispy_utils.GetTestRunPath(test_run, 'rebaseline.txt'),
json.dumps({
'version': chrome_version,
'version_file': version_file}),
'application/json')
def PerformComparisonAndPrepareExpectation(self, test_run, device_type,
expectation, chrome_version,
version_file, screenshots):
"""Perform comparison and generate an expectation that can used later.
The test run web UI will have a button to set the Expectations generated for
this version as the expectation for comparison with later versions.
Args:
test_run: The name of the test run to add the rebaseline file to.
device_type: string identifier for the device type.
chrome_version: the chrome version that can be rebaselined to (must have
associated Expectations).
version_file: the path of the version file associated with the test run.
screenshot: a list of similar PIL.Images.
"""
if not self.CanRebaselineToTestRun(test_run):
self._SetTestRunRebaselineable(test_run, chrome_version, version_file)
expectation_with_version = self._CreateExpectationName(
device_type, expectation, chrome_version)
self._ispy.GenerateExpectation(expectation_with_version, screenshots)
self._ispy.PerformComparison(
test_run,
self._GetExpectationNameWithVersion(
device_type, expectation, chrome_version, version_file),
screenshots[-1])
#!/usr/bin/env python
#
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import unittest
from PIL import Image
import ispy_api
from common import cloud_bucket
from common import mock_cloud_bucket
class ISpyApiTest(unittest.TestCase):
"""Unittest for the ISpy API."""
def setUp(self):
self.cloud_bucket = mock_cloud_bucket.MockCloudBucket()
self.ispy = ispy_api.ISpyApi(self.cloud_bucket)
self.white_img = Image.new('RGBA', (10, 10), (255, 255, 255, 255))
self.black_img = Image.new('RGBA', (10, 10), (0, 0, 0, 255))
def testGenerateExpectationsRunComparison(self):
self.ispy.GenerateExpectation(
'device', 'test', '1.1.1.1', 'versions.json',
[self.white_img, self.white_img])
self.ispy.UpdateExpectationVersion('1.1.1.1', 'versions.json')
self.ispy.PerformComparison(
'test1', 'device', 'test', '1.1.1.1', 'versions.json', self.white_img)
expect_name = self.ispy._CreateExpectationName(
'device', 'test', '1.1.1.1')
self.assertFalse(self.ispy._ispy.FailureExists('test1', expect_name))
self.ispy.PerformComparison(
'test2', 'device', 'test', '1.1.1.1','versions.json', self.black_img)
self.assertTrue(self.ispy._ispy.FailureExists('test2', expect_name))
def testUpdateExpectationVersion(self):
self.ispy.UpdateExpectationVersion('1.0.0.0', 'versions.json')
self.ispy.UpdateExpectationVersion('1.0.4.0', 'versions.json')
self.ispy.UpdateExpectationVersion('2.1.5.0', 'versions.json')
self.ispy.UpdateExpectationVersion('1.1.5.0', 'versions.json')
self.ispy.UpdateExpectationVersion('0.0.0.0', 'versions.json')
self.ispy.UpdateExpectationVersion('1.1.5.0', 'versions.json')
self.ispy.UpdateExpectationVersion('0.0.0.1', 'versions.json')
versions = json.loads(self.cloud_bucket.DownloadFile('versions.json'))
self.assertEqual(versions,
['2.1.5.0', '1.1.5.0', '1.0.4.0', '1.0.0.0', '0.0.0.1', '0.0.0.0'])
def testPerformComparisonAndPrepareExpectation(self):
self.assertFalse(self.ispy.CanRebaselineToTestRun('test'))
self.assertRaises(
cloud_bucket.FileNotFoundError,
self.ispy.PerformComparisonAndPrepareExpectation,
'test', 'device', 'expect', '1.0', 'versions.json',
[self.white_img, self.white_img])
self.assertTrue(self.ispy.CanRebaselineToTestRun('test'))
self.ispy.RebaselineToTestRun('test')
versions = json.loads(self.cloud_bucket.DownloadFile('versions.json'))
self.assertEqual(versions, ['1.0'])
self.ispy.PerformComparisonAndPrepareExpectation(
'test1', 'device', 'expect', '1.1', 'versions.json',
[self.white_img, self.white_img])
if __name__ == '__main__':
unittest.main()
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import sys
import webapp2
sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir))
import debug_view_handler
import image_handler
import main_view_handler
import rebaseline_handler
import update_mask_handler
application = webapp2.WSGIApplication(
[('/update_mask', update_mask_handler.UpdateMaskHandler),
('/rebaseline', rebaseline_handler.RebaselineHandler),
('/debug_view', debug_view_handler.DebugViewHandler),
('/image', image_handler.ImageHandler),
('/', main_view_handler.MainViewHandler)], debug=True)
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Request handler to display the debug view for a Failure."""
import jinja2
import os
import sys
import webapp2
from common import ispy_utils
import views
JINJA = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(views.__file__)),
extensions=['jinja2.ext.autoescape'])
class DebugViewHandler(webapp2.RequestHandler):
"""Request handler to display the debug view for a failure."""
def get(self):
"""Handles get requests to the /debug_view page.
GET Parameters:
test_run: The test run.
expectation: The expectation name.
"""
test_run = self.request.get('test_run')
expectation = self.request.get('expectation')
expected_path = ispy_utils.GetExpectationPath(expectation, 'expected.png')
actual_path = ispy_utils.GetFailurePath(test_run, expectation, 'actual.png')
data = {}
def _ImagePath(url):
return '/image?file_path=%s' % url
data['expected'] = _ImagePath(expected_path)
data['actual'] = _ImagePath(actual_path)
data['test_run'] = test_run
data['expectation'] = expectation
template = JINJA.get_template('debug_view.html')
self.response.write(template.render(data))
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implementation of CloudBucket using Google Cloud Storage as the backend."""
import os
import sys
import cloudstorage
from common import cloud_bucket
class GoogleCloudStorageBucket(cloud_bucket.BaseCloudBucket):
"""Subclass of cloud_bucket.CloudBucket with actual GS commands."""
def __init__(self, bucket):
"""Initializes the bucket.
Args:
bucket: the name of the bucket to connect to.
"""
self.bucket = '/' + bucket
def _full_path(self, path):
return self.bucket + '/' + path.lstrip('/')
# override
def UploadFile(self, path, contents, content_type):
gs_file = cloudstorage.open(
self._full_path(path), 'w', content_type=content_type)
gs_file.write(contents)
gs_file.close()
# override
def DownloadFile(self, path):
try:
gs_file = cloudstorage.open(self._full_path(path), 'r')
r = gs_file.read()
gs_file.close()
except Exception as e:
raise Exception('%s: %s' % (self._full_path(path), str(e)))
return r
# override
def UpdateFile(self, path, contents):
if not self.FileExists(path):
raise cloud_bucket.FileNotFoundError
gs_file = cloudstorage.open(self._full_path(path), 'w')
gs_file.write(contents)
gs_file.close()
# override
def RemoveFile(self, path):
cloudstorage.delete(self._full_path(path))
# override
def FileExists(self, path):
try:
cloudstorage.stat(self._full_path(path))
except cloudstorage.NotFoundError:
return False
return True
# override
def GetImageURL(self, path):
return '/image?file_path=%s' % path
# override
def GetAllPaths(self, prefix, max_keys=None, marker=None, delimiter=None):
return (f.filename[len(self.bucket) + 1:] for f in
cloudstorage.listbucket(self.bucket, prefix=prefix,
max_keys=max_keys, marker=marker, delimiter=delimiter))
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Request handler to display an image from Google Cloud Storage."""
import json
import os
import sys
import webapp2
from common import cloud_bucket
from common import constants
import gs_bucket
class ImageHandler(webapp2.RequestHandler):
"""A request handler to avoid the Same-Origin problem in the debug view."""
def get(self):
"""Handles get requests to the ImageHandler.
GET Parameters:
file_path: A path to an image resource in Google Cloud Storage.
"""
file_path = self.request.get('file_path')
if not file_path:
self.error(404)
return
bucket = gs_bucket.GoogleCloudStorageBucket(constants.BUCKET)
try:
image = bucket.DownloadFile(file_path)
except cloud_bucket.FileNotFoundError:
self.error(404)
else:
self.response.headers['Content-Type'] = 'image/png'
self.response.out.write(image)
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Request handler to serve the main_view page."""
import jinja2
import json
import os
import re
import sys
import webapp2
import ispy_api
from common import constants
from common import ispy_utils
import gs_bucket
import views
JINJA = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(views.__file__)),
extensions=['jinja2.ext.autoescape'])
class MainViewHandler(webapp2.RequestHandler):
"""Request handler to serve the main_view page."""
def get(self):
"""Handles a get request to the main_view page.
If the test_run parameter is specified, then a page displaying all of
the failed runs in the test_run will be shown. Otherwise a view listing
all of the test_runs available for viewing will be displayed.
"""
test_run = self.request.get('test_run')
bucket = gs_bucket.GoogleCloudStorageBucket(constants.BUCKET)
ispy = ispy_utils.ISpyUtils(bucket)
# Load the view.
if test_run:
self._GetForTestRun(test_run, ispy)
return
self._GetAllTestRuns(ispy)
def _GetAllTestRuns(self, ispy):
"""Renders a list view of all of the test_runs available in GS.
Args:
ispy: An instance of ispy_api.ISpyApi.
"""
template = JINJA.get_template('list_view.html')
data = {}
max_keys = 1000
marker = 'failures/%s' % self.request.get('marker')
test_runs = list([path.split('/')[1] for path in
ispy.GetAllPaths('failures/', max_keys=max_keys,
marker=marker, delimiter='/')])
base_url = '/?test_run=%s'
next_url = '/?marker=%s' % test_runs[-1]
data['next_url'] = next_url
data['links'] = [(test_run, base_url % test_run) for test_run in test_runs]
self.response.write(template.render(data))
def _GetForTestRun(self, test_run, ispy):
"""Renders a sorted list of failure-rows for a given test_run.
This method will produce a list of failure-rows that are sorted
in descending order by number of different pixels.
Args:
test_run: The name of the test_run to render failure rows from.
ispy: An instance of ispy_api.ISpyApi.
"""
paths = set([path for path in ispy.GetAllPaths('failures/' + test_run)
if path.endswith('actual.png')])
can_rebaseline = ispy_api.ISpyApi(
ispy.cloud_bucket).CanRebaselineToTestRun(test_run)
rows = [self._CreateRow(test_run, path, ispy) for path in paths]
# Function that sorts by the different_pixels field in the failure-info.
def _Sorter(a, b):
return cmp(b['percent_different'],
a['percent_different'])
template = JINJA.get_template('main_view.html')
self.response.write(
template.render({'comparisons': sorted(rows, _Sorter),
'test_run': test_run,
'can_rebaseline': can_rebaseline}))
def _CreateRow(self, test_run, path, ispy):
"""Creates one failure-row.
This method builds a dictionary with the data necessary to display a
failure in the main_view html template.
Args:
test_run: The name of the test_run the failure is in.
path: A path to the failure's actual.png file.
ispy: An instance of ispy_api.ISpyApi.
Returns:
A dictionary with fields necessary to render a failure-row
in the main_view html template.
"""
res = {}
res['expectation'] = path.lstrip('/').split('/')[2]
res['test_run'] = test_run
res['info'] = json.loads(ispy.cloud_bucket.DownloadFile(
ispy_utils.GetFailurePath(res['test_run'], res['expectation'],
'info.txt')))
expected = ispy_utils.GetExpectationPath(
res['expectation'], 'expected.png')
diff = ispy_utils.GetFailurePath(test_run, res['expectation'], 'diff.png')
res['percent_different'] = res['info']['fraction_different'] * 100
res['expected_path'] = expected
res['diff_path'] = diff
res['actual_path'] = path
res['expected'] = ispy.cloud_bucket.GetImageURL(expected)
res['diff'] = ispy.cloud_bucket.GetImageURL(diff)
res['actual'] = ispy.cloud_bucket.GetImageURL(path)
return res
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Request Handler that updates the Expectation version."""
import webapp2
import ispy_api
from common import constants
import gs_bucket
class RebaselineHandler(webapp2.RequestHandler):
"""Request handler to allow test mask updates."""
def post(self):
"""Accepts post requests.
Expects a test_run as a parameter and updates the associated version file to
use the expectations associated with that test run.
"""
test_run = self.request.get('test_run')
# Fail if test_run parameter is missing.
if not test_run:
self.response.headers['Content-Type'] = 'json/application'
self.response.write(json.dumps(
{'error': '\'test_run\' must be supplied to rebaseline.'}))
return
# Otherwise, set up the utilities.
bucket = gs_bucket.GoogleCloudStorageBucket(constants.BUCKET)
ispy = ispy_api.ISpyApi(bucket)
# Update versions file.
ispy.RebaselineToTestRun(test_run)
# Redirect back to the sites list for the test run.
self.redirect('/?test_run=%s' % test_run)
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Request Handler to allow test mask updates."""
import webapp2
import re
import sys
import os
from common import constants
from common import image_tools
from common import ispy_utils
import gs_bucket
class UpdateMaskHandler(webapp2.RequestHandler):
"""Request handler to allow test mask updates."""
def post(self):
"""Accepts post requests.
This method will accept a post request containing device, site and
device_id parameters. This method takes the diff of the run
indicated by it's parameters and adds it to the mask of the run's
test. It will then delete the run it is applied to and redirect
to the device list view.
"""
test_run = self.request.get('test_run')
expectation = self.request.get('expectation')
# Short-circuit if a parameter is missing.
if not (test_run and expectation):
self.response.headers['Content-Type'] = 'json/application'
self.response.write(json.dumps(
{'error': '\'test_run\' and \'expectation\' must be '
'supplied to update a mask.'}))
return
# Otherwise, set up the utilities.
self.bucket = gs_bucket.GoogleCloudStorageBucket(constants.BUCKET)
self.ispy = ispy_utils.ISpyUtils(self.bucket)
# Short-circuit if the failure does not exist.
if not self.ispy.FailureExists(test_run, expectation):
self.response.headers['Content-Type'] = 'json/application'
self.response.write(json.dumps(
{'error': 'Could not update mask because failure does not exist.'}))
return
# Get the failure namedtuple (which also computes the diff).
failure = self.ispy.GetFailure(test_run, expectation)
# Upload the new mask in place of the original.
self.ispy.UpdateImage(
ispy_utils.GetExpectationPath(expectation, 'mask.png'),
image_tools.ConvertDiffToMask(failure.diff))
# Now that there is no diff for the two images, remove the failure.
self.ispy.RemoveFailure(test_run, expectation)
# Redirect back to the sites list for the test run.
self.redirect('/?test_run=%s' % test_run)
<html>
<head>
<title>Debug {{ expectation }}</title>
<script language="javascript">
var current = 0;
var toggle_interval = null;
var toggle = function() {
current = (current + 1) % 2;
var image = document.getElementById("screenshot");
image.src = (current ? "{{ actual }}" : "{{ expected }}");
var title = document.getElementById("text");
title.textContent = (current ? "Actual" : "Expected");
}
var setup = function() {
toggle();
toggle_interval = window.setInterval(toggle, 1000);
}
var manualToggle = function() {
if (toggle_interval != null)
window.clearInterval(toggle_interval);
toggle();
}
var confirmSubmit = function() {
return confirm("The area in this diff will be ignored in all future comparisions. Are you sure?");
}
</script>
</head>
<body onload="setup();">
<div>
<a href="javascript:void(0)" onclick="manualToggle();">Toggle</a>
&nbsp;&#8594;&nbsp;
<span id="text"></span>
</div>
<br>
<form action="/update_mask" method="post" onsubmit="return confirmSubmit();">
<input type="hidden" name="test_run" value="{{ test_run }}"/>
<input type="hidden" name="expectation" value="{{ expectation }}"/>
<input type="submit" value="Ignore similar diffs in the future"/>
</form>
<br>
<img id="screenshot" src=""/>
</body>
</html>
<!DOCTYPE html>
{% autoescape on %}
<html>
<head>
<title>I-Spy Test Runs</title>
<style>
#container {
display: table;
background-color:#DDD;
border: 1px solid #AAA;
width: 400px;
margin: 5px;
padding: 5px;
}
</style>
</head>
<body>
<h3>Test Runs</h3>
<form method="get">
<label for "prefix">Search Failures by Prefix:</label>
<input type="text" name="marker" id="prefix">
<input type="submit" value="submit">
</form>
<a href="{{ next_url }}">Next</a>
<div id="container">
{% for link in links %}
<div>
<a href="{{ link[1] }}">{{ link[0] }}</a>
</div>
{% endfor %}
</div>
</body>
</html>
{% endautoescape %}
<!DOCTYPE html>
<html>
<head>
<title>{{ test_run }} failures</title>
<style>
.image {
max-height: 325px;
max-width: 325px;
}
.cell {
padding-right: 25px;
padding-left: 25px;
float: left;
width: 20%;
}
.imagelink {
border-width: 0px;
}
.info {
padding-bottom: 25px;
}
.row {
padding-top: 10px;
padding-bottom: 10px;
border-bottom: 2px solid #888;
height: 350px;
}
</style>
<script language="javascript">
var confirmSubmit = function() {
return confirm("The screenshots generated with this version of chrome will be used as the expected images for future comparisions. Are you sure?");
}
</script>
</head>
<body>
<h3>Test Run: {{ test_run }}</h3>
{% if can_rebaseline %}
<form action="/rebaseline" method="post" onsubmit="return confirmSubmit();">
<input type="hidden" name="test_run" value="{{ test_run }}"/>
<input type="submit" value="Set as LKGR"/>
</form>
<br>
{% endif %}
{% if not comparisons %}
<h2>No failures.</h2>
{% endif %}
{% for comp in comparisons %}
<div class="row">
<div class="cell">
Diff&nbsp;&nbsp;({{ "%.1f"|format(comp['percent_different']) }}%)<br>
<a class="imagelink" href="{{ comp['diff'] }}">
<img class="image" src={{ comp['diff'] }}>
</a>
</div>
<div class="cell">
Expected<br>
<a class="imagelink" href="{{ comp['expected'] }}">
<img class="image" src={{ comp['expected'] }}>
</a>
</div>
<div class="cell">
Actual<br>
<a class="imagelink" href="{{ comp['actual'] }}">
<img class="image" src={{ comp['actual'] }}>
</a>
</div>
<div class="cell">
<br>
<div class="info">
{{ comp['expectation'] }}<br>
<a href='/debug_view?test_run={{ comp['test_run'] }}&expectation={{ comp['expectation'] }}'>Debug View</a>
</div>
</div>
</div>
{% endfor %}
</body>
</html>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment