diff --git a/IQA_pytorch/DISTS.py b/IQA_pytorch/DISTS.py index e1a83c3..d83ab6c 100644 --- a/IQA_pytorch/DISTS.py +++ b/IQA_pytorch/DISTS.py @@ -2,43 +2,53 @@ import os import sys import torch -from torchvision import models,transforms +from torchvision import models, transforms import torch.nn as nn import torch.nn.functional as F import inspect from .utils import downsample + class L2pooling(nn.Module): def __init__(self, filter_size=5, stride=2, channels=None, pad_off=0): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') super(L2pooling, self).__init__() - self.padding = (filter_size - 2 )//2 + self.padding = (filter_size - 2) // 2 self.stride = stride self.channels = channels a = np.hanning(filter_size)[1:-1] # a = torch.hann_window(5,periodic=False) - g = torch.Tensor(a[:,None]*a[None,:]) - g = g/torch.sum(g) - self.register_buffer('filter', g[None,None,:,:].repeat((self.channels,1,1,1))) + g = torch.Tensor(a[:, None] * a[None, :]).to(device) + g = g / torch.sum(g) + self.register_buffer( + 'filter', g[None, None, :, :].repeat((self.channels, 1, 1, 1))) def forward(self, input): input = input**2 - out = F.conv2d(input, self.filter, stride=self.stride, padding=self.padding, groups=input.shape[1]) - return (out+1e-12).sqrt() + out = F.conv2d(input, + self.filter, + stride=self.stride, + padding=self.padding, + groups=input.shape[1]) + return (out + 1e-12).sqrt() + class DISTS(torch.nn.Module): ''' Refer to https://github.com/dingkeyan93/DISTS ''' def __init__(self, channels=3, load_weights=True): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') assert channels == 3 super(DISTS, self).__init__() - vgg_pretrained_features = models.vgg16(pretrained=True).features + vgg_pretrained_features = models.vgg16( + pretrained=True).features.to(device) self.stage1 = torch.nn.Sequential() self.stage2 = torch.nn.Sequential() self.stage3 = torch.nn.Sequential() self.stage4 = torch.nn.Sequential() self.stage5 = torch.nn.Sequential() - for x in range(0,4): + for x in range(0, 4): self.stage1.add_module(str(x), vgg_pretrained_features[x]) self.stage2.add_module(str(4), L2pooling(channels=64)) for x in range(5, 9): @@ -52,25 +62,36 @@ def __init__(self, channels=3, load_weights=True): self.stage5.add_module(str(23), L2pooling(channels=512)) for x in range(24, 30): self.stage5.add_module(str(x), vgg_pretrained_features[x]) - + for param in self.parameters(): param.requires_grad = False - self.register_buffer("mean", torch.tensor([0.485, 0.456, 0.406]).view(1,-1,1,1)) - self.register_buffer("std", torch.tensor([0.229, 0.224, 0.225]).view(1,-1,1,1)) - - self.chns = [3,64,128,256,512,512] - self.register_parameter("alpha", nn.Parameter(torch.randn(1, sum(self.chns),1,1))) - self.register_parameter("beta", nn.Parameter(torch.randn(1, sum(self.chns),1,1))) - self.alpha.data.normal_(0.1,0.01) - self.beta.data.normal_(0.1,0.01) + self.register_buffer( + "mean", + torch.tensor([0.485, 0.456, 0.406]).to(device).view(1, -1, 1, 1)) + self.register_buffer( + "std", + torch.tensor([0.229, 0.224, 0.225]).to(device).view(1, -1, 1, 1)) + + self.chns = [3, 64, 128, 256, 512, 512] + self.register_parameter( + "alpha", + nn.Parameter(torch.randn(1, sum(self.chns), 1, 1).to(device))) + self.register_parameter( + "beta", + nn.Parameter(torch.randn(1, sum(self.chns), 1, 1).to(device))) + self.alpha.data.normal_(0.1, 0.01) + self.beta.data.normal_(0.1, 0.01) if load_weights: - weights = torch.load(os.path.abspath(os.path.join(inspect.getfile(DISTS),'..','weights/DISTS.pt'))) + weights = torch.load( + os.path.abspath( + os.path.join(inspect.getfile(DISTS), '..', + 'weights/DISTS.pt'))) self.alpha.data = weights['alpha'] self.beta.data = weights['beta'] def forward_once(self, x): - h = (x-self.mean)/self.std + h = (x - self.mean) / self.std h = self.stage1(h) h_relu1_2 = h h = self.stage2(h) @@ -81,39 +102,41 @@ def forward_once(self, x): h_relu4_3 = h h = self.stage5(h) h_relu5_3 = h - return [x,h_relu1_2, h_relu2_2, h_relu3_3, h_relu4_3, h_relu5_3] + return [x, h_relu1_2, h_relu2_2, h_relu3_3, h_relu4_3, h_relu5_3] - def forward(self, x, y, as_loss=True, resize = True): + def forward(self, x, y, as_loss=True, resize=True): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') assert x.shape == y.shape if resize: x, y = downsample(x, y) if as_loss: feats0 = self.forward_once(x) - feats1 = self.forward_once(y) + feats1 = self.forward_once(y) else: with torch.no_grad(): feats0 = self.forward_once(x) - feats1 = self.forward_once(y) - dist1 = 0 - dist2 = 0 + feats1 = self.forward_once(y) + dist1 = 0 + dist2 = 0 c1 = 1e-6 c2 = 1e-6 w_sum = self.alpha.sum() + self.beta.sum() - alpha = torch.split(self.alpha/w_sum, self.chns, dim=1) - beta = torch.split(self.beta/w_sum, self.chns, dim=1) + alpha = torch.split(self.alpha / w_sum, self.chns, dim=1) + beta = torch.split(self.beta / w_sum, self.chns, dim=1) for k in range(len(self.chns)): - x_mean = feats0[k].mean([2,3], keepdim=True) - y_mean = feats1[k].mean([2,3], keepdim=True) - S1 = (2*x_mean*y_mean+c1)/(x_mean**2+y_mean**2+c1) - dist1 = dist1+(alpha[k]*S1).sum(1,keepdim=True) - - x_var = ((feats0[k]-x_mean)**2).mean([2,3], keepdim=True) - y_var = ((feats1[k]-y_mean)**2).mean([2,3], keepdim=True) - xy_cov = (feats0[k]*feats1[k]).mean([2,3],keepdim=True) - x_mean*y_mean - S2 = (2*xy_cov+c2)/(x_var+y_var+c2) - dist2 = dist2+(beta[k]*S2).sum(1,keepdim=True) - - score = 1 - (dist1+dist2).squeeze() + x_mean = feats0[k].mean([2, 3], keepdim=True) + y_mean = feats1[k].mean([2, 3], keepdim=True) + S1 = (2 * x_mean * y_mean + c1) / (x_mean**2 + y_mean**2 + c1) + dist1 = dist1 + (alpha[k].to(device) * S1).sum(1, keepdim=True) + + x_var = ((feats0[k] - x_mean)**2).mean([2, 3], keepdim=True) + y_var = ((feats1[k] - y_mean)**2).mean([2, 3], keepdim=True) + xy_cov = (feats0[k] * feats1[k]).mean( + [2, 3], keepdim=True) - x_mean * y_mean + S2 = (2 * xy_cov + c2) / (x_var + y_var + c2) + dist2 = dist2 + (beta[k].to(device) * S2).sum(1, keepdim=True) + + score = 1 - (dist1 + dist2).squeeze() if as_loss: return score.mean() else: @@ -141,4 +164,3 @@ def forward(self, x, y, as_loss=True, resize = True): score = model(ref, dist, as_loss=False) print('score: %.4f' % score.item()) # score: 0.3347 - diff --git a/IQA_pytorch/GMSD.py b/IQA_pytorch/GMSD.py index b21989c..36ed251 100644 --- a/IQA_pytorch/GMSD.py +++ b/IQA_pytorch/GMSD.py @@ -4,34 +4,48 @@ import numpy as np from torchvision import transforms + class GMSD(nn.Module): # Refer to http://www4.comp.polyu.edu.hk/~cslzhang/IQA/GMSD/GMSD.htm def __init__(self, channels=3): super(GMSD, self).__init__() + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.channels = channels - dx = (torch.Tensor([[1,0,-1],[1,0,-1],[1,0,-1]])/3.).unsqueeze(0).unsqueeze(0).repeat(channels,1,1,1) - dy = (torch.Tensor([[1,1,1],[0,0,0],[-1,-1,-1]])/3.).unsqueeze(0).unsqueeze(0).repeat(channels,1,1,1) + dx = (torch.Tensor([[1, 0, -1], [1, 0, -1], [1, 0, -1]]).to(device) / + 3.).unsqueeze(0).unsqueeze(0).repeat(channels, 1, 1, 1) + dy = (torch.Tensor([[1, 1, 1], [0, 0, 0], [-1, -1, -1]]).to(device) / + 3.).unsqueeze(0).unsqueeze(0).repeat(channels, 1, 1, 1) self.dx = nn.Parameter(dx, requires_grad=False) self.dy = nn.Parameter(dy, requires_grad=False) - self.aveKernel = nn.Parameter(torch.ones(channels,1,2,2)/4., requires_grad=False) + self.aveKernel = nn.Parameter( + torch.ones(channels, 1, 2, 2).to(device) / 4., requires_grad=False) def gmsd(self, img1, img2, T=170): - Y1 = F.conv2d(img1, self.aveKernel, stride=2, padding =0, groups = self.channels) - Y2 = F.conv2d(img2, self.aveKernel, stride=2, padding =0, groups = self.channels) + Y1 = F.conv2d(img1, + self.aveKernel, + stride=2, + padding=0, + groups=self.channels) + Y2 = F.conv2d(img2, + self.aveKernel, + stride=2, + padding=0, + groups=self.channels) + + IxY1 = F.conv2d(Y1, self.dx, stride=1, padding=1, groups=self.channels) + IyY1 = F.conv2d(Y1, self.dy, stride=1, padding=1, groups=self.channels) + gradientMap1 = torch.sqrt(IxY1**2 + IyY1**2 + 1e-12) - IxY1 = F.conv2d(Y1, self.dx, stride=1, padding =1, groups = self.channels) - IyY1 = F.conv2d(Y1, self.dy, stride=1, padding =1, groups = self.channels) - gradientMap1 = torch.sqrt(IxY1**2 + IyY1**2+1e-12) + IxY2 = F.conv2d(Y2, self.dx, stride=1, padding=1, groups=self.channels) + IyY2 = F.conv2d(Y2, self.dy, stride=1, padding=1, groups=self.channels) + gradientMap2 = torch.sqrt(IxY2**2 + IyY2**2 + 1e-12) - IxY2 = F.conv2d(Y2, self.dx, stride=1, padding =1, groups = self.channels) - IyY2 = F.conv2d(Y2, self.dy, stride=1, padding =1, groups = self.channels) - gradientMap2 = torch.sqrt(IxY2**2 + IyY2**2+1e-12) - - quality_map = (2*gradientMap1*gradientMap2 + T)/(gradientMap1**2+gradientMap2**2 + T) - score = torch.std(quality_map.view(quality_map.shape[0],-1),dim=1) + quality_map = (2 * gradientMap1 * gradientMap2 + + T) / (gradientMap1**2 + gradientMap2**2 + T) + score = torch.std(quality_map.view(quality_map.shape[0], -1), dim=1) return score - + def forward(self, y, x, as_loss=True): assert x.shape == y.shape x = x * 255 @@ -44,6 +58,7 @@ def forward(self, y, x, as_loss=True): score = self.gmsd(x, y) return score + if __name__ == '__main__': from PIL import Image import argparse @@ -64,4 +79,3 @@ def forward(self, y, x, as_loss=True): score = model(ref, dist, as_loss=False) print('score: %.4f' % score.item()) # score: 0.1907 - diff --git a/IQA_pytorch/LPIPSvgg.py b/IQA_pytorch/LPIPSvgg.py index 26eae33..dfeffd9 100644 --- a/IQA_pytorch/LPIPSvgg.py +++ b/IQA_pytorch/LPIPSvgg.py @@ -2,7 +2,7 @@ import os import sys import torch -from torchvision import models,transforms +from torchvision import models, transforms import torch.nn as nn import torch.nn.functional as F import inspect @@ -11,16 +11,17 @@ class LPIPSvgg(torch.nn.Module): def __init__(self, channels=3): # Refer to https://github.com/richzhang/PerceptualSimilarity - + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') assert channels == 3 super(LPIPSvgg, self).__init__() - vgg_pretrained_features = models.vgg16(pretrained=True).features + vgg_pretrained_features = models.vgg16( + pretrained=True).features.to(device) self.stage1 = torch.nn.Sequential() self.stage2 = torch.nn.Sequential() self.stage3 = torch.nn.Sequential() self.stage4 = torch.nn.Sequential() self.stage5 = torch.nn.Sequential() - for x in range(0,4): + for x in range(0, 4): self.stage1.add_module(str(x), vgg_pretrained_features[x]) for x in range(4, 9): self.stage2.add_module(str(x), vgg_pretrained_features[x]) @@ -30,19 +31,26 @@ def __init__(self, channels=3): self.stage4.add_module(str(x), vgg_pretrained_features[x]) for x in range(23, 30): self.stage5.add_module(str(x), vgg_pretrained_features[x]) - + for param in self.parameters(): param.requires_grad = False - self.register_buffer("mean", torch.tensor([0.485, 0.456, 0.406]).view(1,-1,1,1)) - self.register_buffer("std", torch.tensor([0.229, 0.224, 0.225]).view(1,-1,1,1)) + self.register_buffer( + "mean", + torch.tensor([0.485, 0.456, 0.406]).to(device).view(1, -1, 1, 1)) + self.register_buffer( + "std", + torch.tensor([0.229, 0.224, 0.225]).to(device).view(1, -1, 1, 1)) - self.chns = [64,128,256,512,512] - self.weights = torch.load(os.path.abspath(os.path.join(inspect.getfile(LPIPSvgg),'..','weights/LPIPSvgg.pt'))) + self.chns = [64, 128, 256, 512, 512] + self.weights = torch.load( + os.path.abspath( + os.path.join(inspect.getfile(LPIPSvgg), '..', + 'weights/LPIPSvgg.pt'))) self.weights = list(self.weights.items()) - + def forward_once(self, x): - h = (x-self.mean)/self.std + h = (x - self.mean) / self.std h = self.stage1(h) h_relu1_2 = h h = self.stage2(h) @@ -62,19 +70,21 @@ def forward(self, x, y, as_loss=True): assert x.shape == y.shape if as_loss: feats0 = self.forward_once(x) - feats1 = self.forward_once(y) + feats1 = self.forward_once(y) else: with torch.no_grad(): feats0 = self.forward_once(x) - feats1 = self.forward_once(y) - score = 0 + feats1 = self.forward_once(y) + score = 0 for k in range(len(self.chns)): - score = score + (self.weights[k][1]*(feats0[k]-feats1[k])**2).mean([2,3]).sum(1) + score = score + (self.weights[k][1] * + (feats0[k] - feats1[k])**2).mean([2, 3]).sum(1) if as_loss: return score.mean() else: return score + if __name__ == '__main__': from PIL import Image import argparse @@ -95,4 +105,3 @@ def forward(self, x, y, as_loss=True): score = model(ref, dist, as_loss=False) print('score: %.4f' % score.item()) # score: 0.5435 - diff --git a/IQA_pytorch/NLPD.py b/IQA_pytorch/NLPD.py index a61fbf0..49661b5 100644 --- a/IQA_pytorch/NLPD.py +++ b/IQA_pytorch/NLPD.py @@ -8,7 +8,8 @@ [0.0200, 0.1000, 0.1600, 0.1000, 0.0200], [0.0125, 0.0625, 0.1000, 0.0625, 0.0125], [0.0025, 0.0125, 0.0200, 0.0125, 0.0025]], - dtype=np.float32) + dtype=np.float32) + class NLPD(nn.Module): """ @@ -17,55 +18,70 @@ class NLPD(nn.Module): https://github.com/alexhepburn/nlpd-tensorflow """ def __init__(self, channels=3, k=6, filt=None): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') super(NLPD, self).__init__() if filt is None: filt = np.reshape(np.tile(LAPLACIAN_FILTER, (channels, 1, 1)), (channels, 1, 5, 5)) self.k = k self.channels = channels - self.filt = nn.Parameter(torch.Tensor(filt), requires_grad=False) + self.filt = nn.Parameter(torch.Tensor(filt).to(device), + requires_grad=False) self.dn_filts, self.sigmas = self.DN_filters() self.pad_one = nn.ReflectionPad2d(1) self.pad_two = nn.ReflectionPad2d(2) - self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', + self.upsample = nn.Upsample(scale_factor=2, + mode='bilinear', align_corners=True) def DN_filters(self): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') sigmas = [0.0248, 0.0185, 0.0179, 0.0191, 0.0220, 0.2782] dn_filts = [] - dn_filts.append(torch.Tensor(np.reshape([[0, 0.1011, 0], - [0.1493, 0, 0.1460], - [0, 0.1015, 0.]]*self.channels, - (self.channels, 1, 3, 3)).astype(np.float32))) - - dn_filts.append(torch.Tensor(np.reshape([[0, 0.0757, 0], - [0.1986, 0, 0.1846], - [0, 0.0837, 0]]*self.channels, - (self.channels, 1, 3, 3)).astype(np.float32))) - - dn_filts.append(torch.Tensor(np.reshape([[0, 0.0477, 0], - [0.2138, 0, 0.2243], - [0, 0.0467, 0]]*self.channels, - (self.channels, 1, 3, 3)).astype(np.float32))) - - dn_filts.append(torch.Tensor(np.reshape([[0, 0, 0], - [0.2503, 0, 0.2616], - [0, 0, 0]]*self.channels, - (self.channels, 1, 3, 3)).astype(np.float32))) - - dn_filts.append(torch.Tensor(np.reshape([[0, 0, 0], - [0.2598, 0, 0.2552], - [0, 0, 0]]*self.channels, - (self.channels, 1, 3, 3)).astype(np.float32))) - - dn_filts.append(torch.Tensor(np.reshape([[0, 0, 0], - [0.2215, 0, 0.0717], - [0, 0, 0]]*self.channels, - (self.channels, 1, 3, 3)).astype(np.float32))) - dn_filts = nn.ParameterList([nn.Parameter(x, requires_grad=False) - for x in dn_filts]) - sigmas = nn.ParameterList([nn.Parameter(torch.Tensor(np.array(x)), - requires_grad=False) for x in sigmas]) + dn_filts.append( + torch.Tensor( + np.reshape( + [[0, 0.1011, 0], [0.1493, 0, 0.1460], [0, 0.1015, 0.]] * + self.channels, + (self.channels, 1, 3, 3)).astype(np.float32)).to(device)) + + dn_filts.append( + torch.Tensor( + np.reshape( + [[0, 0.0757, 0], [0.1986, 0, 0.1846], [0, 0.0837, 0]] * + self.channels, + (self.channels, 1, 3, 3)).astype(np.float32)).to(device)) + + dn_filts.append( + torch.Tensor( + np.reshape( + [[0, 0.0477, 0], [0.2138, 0, 0.2243], [0, 0.0467, 0]] * + self.channels, + (self.channels, 1, 3, 3)).astype(np.float32)).to(device)) + + dn_filts.append( + torch.Tensor( + np.reshape([[0, 0, 0], [0.2503, 0, 0.2616], [0, 0, 0]] * + self.channels, (self.channels, 1, 3, 3)).astype( + np.float32)).to(device)) + + dn_filts.append( + torch.Tensor( + np.reshape([[0, 0, 0], [0.2598, 0, 0.2552], [0, 0, 0]] * + self.channels, (self.channels, 1, 3, 3)).astype( + np.float32)).to(device)) + + dn_filts.append( + torch.Tensor( + np.reshape([[0, 0, 0], [0.2215, 0, 0.0717], [0, 0, 0]] * + self.channels, (self.channels, 1, 3, 3)).astype( + np.float32)).to(device)) + dn_filts = nn.ParameterList( + [nn.Parameter(x, requires_grad=False) for x in dn_filts]) + sigmas = nn.ParameterList([ + nn.Parameter(torch.Tensor(np.array(x)).to(device), + requires_grad=False) for x in sigmas + ]) return dn_filts, sigmas def pyramid(self, im): @@ -73,30 +89,38 @@ def pyramid(self, im): J = im pyr = [] for i in range(0, self.k): - I = F.conv2d(self.pad_two(J), self.filt, stride=2, padding=0, + I = F.conv2d(self.pad_two(J), + self.filt, + stride=2, + padding=0, groups=self.channels) I_up = self.upsample(I) - I_up_conv = F.conv2d(self.pad_two(I_up), self.filt, stride=1, - padding=0, groups=self.channels) + I_up_conv = F.conv2d(self.pad_two(I_up), + self.filt, + stride=1, + padding=0, + groups=self.channels) if J.size() != I_up_conv.size(): I_up_conv = F.interpolate(I_up_conv, [J.size(2), J.size(3)]) out = J - I_up_conv - out_conv = F.conv2d(self.pad_one(torch.abs(out)), self.dn_filts[i], - stride=1, groups=self.channels) - out_norm = out / (self.sigmas[i]+out_conv) + out_conv = F.conv2d(self.pad_one(torch.abs(out)), + self.dn_filts[i], + stride=1, + groups=self.channels) + out_norm = out / (self.sigmas[i] + out_conv) pyr.append(out_norm) J = I return pyr def nlpd(self, x1, x2): y1 = self.pyramid(x1) - y2 = self.pyramid(x2) + y2 = self.pyramid(x2) total = [] for z1, z2 in zip(y1, y2): - diff = (z1 - z2) ** 2 + diff = (z1 - z2)**2 sqrt = torch.sqrt(torch.mean(diff, (1, 2, 3))) total.append(sqrt) - score = torch.stack(total,dim=1).mean(1) + score = torch.stack(total, dim=1).mean(1) return score def forward(self, y, x, as_loss=True): @@ -109,6 +133,7 @@ def forward(self, y, x, as_loss=True): score = self.nlpd(x, y) return score + if __name__ == '__main__': from PIL import Image import argparse @@ -120,12 +145,12 @@ def forward(self, y, x, as_loss=True): args = parser.parse_args() device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - + ref = prepare_image(Image.open(args.ref).convert("L")).to(device) dist = prepare_image(Image.open(args.dist).convert("L")).to(device) - + model = NLPD(channels=1).to(device) score = model(dist, ref, as_loss=False) print('score: %.4f' % score.item()) - # score: 0.4016 \ No newline at end of file + # score: 0.4016