adversarial_AIRBERT/r2r_src/eval.py
Shizhe Chen bbeb69aa5f init
2021-08-02 13:04:04 +00:00

113 lines
4.8 KiB
Python

''' Evaluation of agent trajectories '''
import json
import os
import sys
from collections import defaultdict
import networkx as nx
import numpy as np
import pprint
pp = pprint.PrettyPrinter(indent=4)
from env import R2RBatch
from utils import load_datasets, load_nav_graphs, ndtw_graphload, DTW
from agent import BaseAgent
class Evaluation(object):
''' Results submission format: [{'instr_id': string, 'trajectory':[(viewpoint_id, heading_rads, elevation_rads),] } ] '''
def __init__(self, splits, scans, tok):
self.error_margin = 3.0
self.splits = splits
self.tok = tok
self.gt = {}
self.instr_ids = []
self.scans = []
for split in splits:
for item in load_datasets([split]):
if scans is not None and item['scan'] not in scans:
continue
self.gt[str(item['path_id'])] = item
self.scans.append(item['scan'])
self.instr_ids += ['%s_%d' % (item['path_id'], i) for i in range(len(item['instructions']))]
self.scans = set(self.scans)
self.instr_ids = set(self.instr_ids)
self.graphs = load_nav_graphs(self.scans)
self.distances = {}
for scan,G in self.graphs.items(): # compute all shortest paths
self.distances[scan] = dict(nx.all_pairs_dijkstra_path_length(G))
def _get_nearest(self, scan, goal_id, path):
near_id = path[0][0]
near_d = self.distances[scan][near_id][goal_id]
for item in path:
d = self.distances[scan][item[0]][goal_id]
if d < near_d:
near_id = item[0]
near_d = d
return near_id
def _score_item(self, instr_id, path):
''' Calculate error based on the final position in trajectory, and also
the closest position (oracle stopping rule).
The path contains [view_id, angle, vofv] '''
gt = self.gt[instr_id.split('_')[-2]]
start = gt['path'][0]
assert start == path[0][0], 'Result trajectories should include the start position'
goal = gt['path'][-1]
final_position = path[-1][0] # the first of [view_id, angle, vofv]
nearest_position = self._get_nearest(gt['scan'], goal, path)
self.scores['nav_errors'].append(self.distances[gt['scan']][final_position][goal])
self.scores['oracle_errors'].append(self.distances[gt['scan']][nearest_position][goal])
self.scores['trajectory_steps'].append(len(path)-1)
distance = 0 # length of the path in meters
prev = path[0]
for curr in path[1:]:
distance += self.distances[gt['scan']][prev[0]][curr[0]]
prev = curr
self.scores['trajectory_lengths'].append(distance)
self.scores['shortest_lengths'].append(
self.distances[gt['scan']][start][goal]
)
def score(self, output_file):
''' Evaluate each agent trajectory based on how close it got to the goal location '''
self.scores = defaultdict(list)
instr_ids = set(self.instr_ids)
if type(output_file) is str:
with open(output_file) as f:
results = json.load(f)
else:
results = output_file
print('result length', len(results))
for item in results:
# Check against expected ids
if item['instr_id'] in instr_ids:
instr_ids.remove(item['instr_id'])
self._score_item(item['instr_id'], item['trajectory'])
if 'train' not in self.splits: # Exclude the training from this. (Because training eval may be partial)
assert len(instr_ids) == 0, 'Missing %d of %d instruction ids from %s - not in %s'\
% (len(instr_ids), len(self.instr_ids), ",".join(self.splits), output_file)
assert len(self.scores['nav_errors']) == len(self.instr_ids)
score_summary = {
'nav_error': np.average(self.scores['nav_errors']),
'oracle_error': np.average(self.scores['oracle_errors']),
'steps': np.average(self.scores['trajectory_steps']),
'lengths': np.average(self.scores['trajectory_lengths'])
}
num_successes = len([i for i in self.scores['nav_errors'] if i < self.error_margin])
score_summary['success_rate'] = float(num_successes)/float(len(self.scores['nav_errors']))
oracle_successes = len([i for i in self.scores['oracle_errors'] if i < self.error_margin])
score_summary['oracle_rate'] = float(oracle_successes)/float(len(self.scores['oracle_errors']))
spl = [float(error < self.error_margin) * l / max(l, p, 0.01)
for error, p, l in
zip(self.scores['nav_errors'], self.scores['trajectory_lengths'], self.scores['shortest_lengths'])
]
score_summary['spl'] = np.average(spl)
return score_summary, self.scores