''' Batched REVERIE navigation environment ''' import json import os import numpy as np import math import random import networkx as nx from collections import defaultdict import copy from glob import glob import MatterSim from utils.data import load_nav_graphs, new_simulator from utils.data import angle_feature, get_all_point_angle_feature with open('./node_region.json') as fp: node_region = json.load(fp) with open('region2objs.json') as fp: region2objs = json.load(fp) with open('vp2objs.json') as fp: vp2objs = json.load(fp) class EnvBatch(object): ''' A simple wrapper for a batch of MatterSim environments, using discretized viewpoints and pretrained features ''' def __init__(self, connectivity_dir, scan_data_dir=None, feat_db=None, batch_size=100): """ 1. Load pretrained image feature 2. Init the Simulator. :param feat_db: The name of file stored the feature. :param batch_size: Used to create the simulator list. """ self.feat_db = feat_db self.image_w = 640 self.image_h = 480 self.vfov = 60 self.sims = [] for i in range(batch_size): sim = MatterSim.Simulator() if scan_data_dir: sim.setDatasetPath(scan_data_dir) sim.setNavGraphPath(connectivity_dir) sim.setRenderingEnabled(False) sim.setDiscretizedViewingAngles(True) # Set increment/decrement to 30 degree. (otherwise by radians) sim.setCameraResolution(self.image_w, self.image_h) sim.setCameraVFOV(math.radians(self.vfov)) sim.setBatchSize(1) sim.initialize() self.sims.append(sim) def _make_id(self, scanId, viewpointId): return scanId + '_' + viewpointId def newEpisodes(self, scanIds, viewpointIds, headings): for i, (scanId, viewpointId, heading) in enumerate(zip(scanIds, viewpointIds, headings)): self.sims[i].newEpisode([scanId], [viewpointId], [heading], [0]) def getStates(self): """ Get list of states augmented with precomputed image features. rgb field will be empty. Agent's current view [0-35] (set only when viewing angles are discretized) [0-11] looking down, [12-23] looking at horizon, [24-35] looking up :return: [ ((36, 2048), sim_state) ] * batch_size """ feature_states = [] for i, sim in enumerate(self.sims): state = sim.getState()[0] feature = self.feat_db.get_image_feature(state.scanId, state.location.viewpointId) feature_states.append((feature, state)) return feature_states def makeActions(self, actions): ''' Take an action using the full state dependent action interface (with batched input). Every action element should be an (index, heading, elevation) tuple. ''' for i, (index, heading, elevation) in enumerate(actions): self.sims[i].makeAction([index], [heading], [elevation]) class ReverieObjectNavBatch(object): ''' Implements the REVERIE navigation task, using discretized viewpoints and pretrained features ''' def __init__( self, view_db, obj_db, instr_data, connectivity_dir, obj2vps, multi_endpoints=False, multi_startpoints=False, batch_size=64, angle_feat_size=4, max_objects=None, seed=0, name=None, sel_data_idxs=None, ): self.env = EnvBatch(connectivity_dir, feat_db=view_db, batch_size=batch_size) self.obj_db = obj_db self.data = instr_data self.scans = set([x['scan'] for x in self.data]) self.multi_endpoints = multi_endpoints self.multi_startpoints = multi_startpoints self.obj2vps = obj2vps # {scan_objid: vp_list} (objects can be viewed at the viewpoints) self.connectivity_dir = connectivity_dir self.batch_size = batch_size self.angle_feat_size = angle_feat_size self.max_objects = max_objects self.name = name for item in self.data: if 'objId' in item and item['objId'] is not None: item['end_vps'] = self.obj2vps['%s_%s'%(item['scan'], item['objId'])] self.gt_trajs = self._get_gt_trajs(self.data) # for evaluation # in validation, we would split the data if sel_data_idxs is not None: t_split, n_splits = sel_data_idxs ndata_per_split = len(self.data) // n_splits start_idx = ndata_per_split * t_split if t_split == n_splits - 1: end_idx = None else: end_idx = start_idx + ndata_per_split self.data = self.data[start_idx: end_idx] # use different seeds in different processes to shuffle data self.seed = seed random.seed(self.seed) random.shuffle(self.data) self.ix = 0 self._load_nav_graphs() self.sim = new_simulator(self.connectivity_dir) self.angle_feature = get_all_point_angle_feature(self.sim, self.angle_feat_size) self.buffered_state_dict = {} print('%s loaded with %d instructions, using splits: %s' % ( self.__class__.__name__, len(self.data), self.name)) def _get_gt_trajs(self, data): gt_trajs = { x['instr_id']: (x['scan'], x['path'], x['objId']) \ for x in data if 'objId' in x and x['objId'] is not None } return gt_trajs def size(self): return len(self.data) def _load_nav_graphs(self): """ load graph from self.scan, Store the graph {scan_id: graph} in self.graphs Store the shortest path {scan_id: {view_id_x: {view_id_y: [path]} } } in self.paths Store the distances in self.distances. (Structure see above) Load connectivity graph for each scan, useful for reasoning about shortest paths :return: None """ print('Loading navigation graphs for %d scans' % len(self.scans)) self.graphs = load_nav_graphs(self.connectivity_dir, self.scans) self.shortest_paths = {} for scan, G in self.graphs.items(): # compute all shortest paths self.shortest_paths[scan] = dict(nx.all_pairs_dijkstra_path(G)) self.shortest_distances = {} for scan, G in self.graphs.items(): # compute all shortest paths self.shortest_distances[scan] = dict(nx.all_pairs_dijkstra_path_length(G)) def _next_minibatch(self, batch_size=None, **kwargs): """ Store the minibach in 'self.batch' """ if batch_size is None: batch_size = self.batch_size batch = self.data[self.ix: self.ix+batch_size] if len(batch) < batch_size: random.shuffle(self.data) self.ix = batch_size - len(batch) batch += self.data[:self.ix] else: self.ix += batch_size self.batch = batch start_vps = [x['path'][0] for x in self.batch] end_vps = [x['path'][-1] for x in self.batch] if self.multi_startpoints: for i, item in enumerate(batch): cand_vps = [] for cvp, cpath in self.shortest_paths[item['scan']][end_vps[i]].items(): if len(cpath) >= 4 and len(cpath) <= 7: cand_vps.append(cvp) if len(cand_vps) > 0: start_vps[i] = cand_vps[np.random.randint(len(cand_vps))] if self.multi_endpoints: for i, item in enumerate(batch): end_vp = item['end_vps'][np.random.randint(len(item['end_vps']))] end_vps[i] = end_vp if self.multi_startpoints or self.multi_endpoints: batch = copy.deepcopy(self.batch) for i, item in enumerate(batch): item['path'] = self.shortest_paths[item['scan']][start_vps[i]][end_vps[i]] self.batch = batch def reset_epoch(self, shuffle=False): ''' Reset the data index to beginning of epoch. Primarily for testing. You must still call reset() for a new episode. ''' if shuffle: random.shuffle(self.data) self.ix = 0 def make_candidate(self, feature, scanId, viewpointId, viewId): def _loc_distance(loc): return np.sqrt(loc.rel_heading ** 2 + loc.rel_elevation ** 2) base_heading = (viewId % 12) * math.radians(30) base_elevation = (viewId // 12 - 1) * math.radians(30) adj_dict = {} long_id = "%s_%s" % (scanId, viewpointId) if long_id not in self.buffered_state_dict: for ix in range(36): if ix == 0: self.sim.newEpisode([scanId], [viewpointId], [0], [math.radians(-30)]) elif ix % 12 == 0: self.sim.makeAction([0], [1.0], [1.0]) else: self.sim.makeAction([0], [1.0], [0]) state = self.sim.getState()[0] assert state.viewIndex == ix # Heading and elevation for the viewpoint center heading = state.heading - base_heading elevation = state.elevation - base_elevation visual_feat = feature[ix] # get adjacent locations for j, loc in enumerate(state.navigableLocations[1:]): # if a loc is visible from multiple view, use the closest # view (in angular distance) as its representation distance = _loc_distance(loc) # Heading and elevation for for the loc loc_heading = heading + loc.rel_heading loc_elevation = elevation + loc.rel_elevation angle_feat = angle_feature(loc_heading, loc_elevation, self.angle_feat_size) if (loc.viewpointId not in adj_dict or distance < adj_dict[loc.viewpointId]['distance']): adj_dict[loc.viewpointId] = { 'heading': loc_heading, 'elevation': loc_elevation, "normalized_heading": state.heading + loc.rel_heading, "normalized_elevation": state.elevation + loc.rel_elevation, 'scanId': scanId, 'viewpointId': loc.viewpointId, # Next viewpoint id 'pointId': ix, 'distance': distance, 'idx': j + 1, 'feature': np.concatenate((visual_feat, angle_feat), -1), 'position': (loc.x, loc.y, loc.z), } candidate = list(adj_dict.values()) self.buffered_state_dict[long_id] = [ {key: c[key] for key in ['normalized_heading', 'normalized_elevation', 'scanId', 'viewpointId', 'pointId', 'idx', 'position']} for c in candidate ] return candidate else: candidate = self.buffered_state_dict[long_id] candidate_new = [] for c in candidate: c_new = c.copy() ix = c_new['pointId'] visual_feat = feature[ix] c_new['heading'] = c_new['normalized_heading'] - base_heading c_new['elevation'] = c_new['normalized_elevation'] - base_elevation angle_feat = angle_feature(c_new['heading'], c_new['elevation'], self.angle_feat_size) c_new['feature'] = np.concatenate((visual_feat, angle_feat), -1) c_new.pop('normalized_heading') c_new.pop('normalized_elevation') candidate_new.append(c_new) return candidate_new def _get_obs(self): obs = [] for i, (feature, state) in enumerate(self.env.getStates()): item = self.batch[i] base_view_id = state.viewIndex # Full features candidate = self.make_candidate(feature, state.scanId, state.location.viewpointId, state.viewIndex) # [visual_feature, angle_feature] for views feature = np.concatenate((feature, self.angle_feature[base_view_id]), -1) # objects obj_img_fts, obj_ang_fts, obj_box_fts, obj_ids = self.obj_db.get_object_feature( state.scanId, state.location.viewpointId, state.heading, state.elevation, self.angle_feat_size, max_objects=self.max_objects ) ob = { 'instr_id' : item['instr_id'], 'scan' : state.scanId, 'viewpoint' : state.location.viewpointId, 'viewIndex' : state.viewIndex, 'position': (state.location.x, state.location.y, state.location.z), 'heading' : state.heading, 'elevation' : state.elevation, 'feature' : feature, 'candidate': candidate, 'obj_img_fts': obj_img_fts, 'obj_ang_fts': obj_ang_fts, 'obj_box_fts': obj_box_fts, 'obj_ids': obj_ids, 'navigableLocations' : state.navigableLocations, 'instruction' : item['instruction'], 'instr_encoding': item['instr_encoding'], 'gt_found' : item['found'], 'gt_path' : item['path'], 'gt_end_vps': item.get('end_vps', []), 'gt_obj_id': item['objId'], 'path_id' : item['path_id'] } # RL reward. The negative distance between the state and the final state # There are multiple gt end viewpoints on REVERIE. if ob['instr_id'] in self.gt_trajs: gt_objid = self.gt_trajs[ob['instr_id']][-1] min_dist = np.inf for vp in self.obj2vps['%s_%s'%(ob['scan'], str(gt_objid))]: try: min_dist = min(min_dist, self.shortest_distances[ob['scan']][ob['viewpoint']][vp]) except: print(ob['scan'], ob['viewpoint'], vp) exit(0) ob['distance'] = min_dist else: ob['distance'] = 0 obs.append(ob) return obs def reset(self, **kwargs): ''' Load a new minibatch / episodes. ''' self._next_minibatch(**kwargs) scanIds = [item['scan'] for item in self.batch] viewpointIds = [item['path'][0] for item in self.batch] headings = [item['heading'] for item in self.batch] self.env.newEpisodes(scanIds, viewpointIds, headings) return self._get_obs() def step(self, actions): ''' Take action (same interface as makeActions) ''' self.env.makeActions(actions) return self._get_obs() ############### Nav Evaluation ############### def _eval_item(self, scan, pred_path, pred_objid, gt_path, gt_objid, pred_found, gt_found): scores = {} shortest_distances = self.shortest_distances[scan] path = sum(pred_path, []) assert gt_path[0] == path[0], 'Result trajectories should include the start position' pred_stop_region = node_region[scan][path[-1]] gt_stop_region = node_region[scan][gt_path[-1]] scores['action_steps'] = len(pred_path) - 1 scores['trajectory_steps'] = len(path) - 1 scores['trajectory_lengths'] = np.sum([shortest_distances[a][b] for a, b in zip(path[:-1], path[1:])]) gt_lengths = np.sum([shortest_distances[a][b] for a, b in zip(gt_path[:-1], gt_path[1:])]) # navigation: success is to arrive to a viewpoint where the object is visible goal_viewpoints = set(self.obj2vps['%s_%s'%(scan, str(gt_objid))]) assert len(goal_viewpoints) > 0, '%s_%s'%(scan, str(gt_objid)) scores['found_success'] = float(pred_found == gt_found) scores['success'] = float(path[-1] in goal_viewpoints) scores['room_success'] = float(pred_stop_region == gt_stop_region) scores['oracle_success'] = float(any(x in goal_viewpoints for x in path)) gt_room_start_vp = None gt_back_path = [] gt_front_path = [] exit_room = False for vp in gt_path[::-1]: if node_region[scan][vp] == gt_stop_region and not exit_room: gt_back_path.append(vp) gt_room_start_vp = vp else: exit_room = True gt_front_path.append(vp) gt_front_path.reverse() gt_back_path.reverse() assert (gt_front_path + gt_back_path) == gt_path, "Front path & Back path error" gt_front_path += [gt_room_start_vp] ''' if scores['success'] == 1.0: scores['found_success'] = float(pred_found == gt_found) else: scores['found_success'] = 0.0 ''' gt_reach_length = np.sum([shortest_distances[a][b] for a, b in zip(gt_front_path[:-1], gt_front_path[1:])]) if len(gt_front_path) != 1 else 0.01 gt_explore_length = np.sum([shortest_distances[a][b] for a, b in zip(gt_back_path[:-1], gt_back_path[1:])]) if len(gt_back_path) != 1 else 0.01 if scores['room_success'] != 0.0: # corse-grained # get the reach_path & explore_path room_start_vp = None back_path = [] front_path = [] exit_room = False for vp in path[::-1]: if node_region[scan][vp] == gt_stop_region and not exit_room: back_path.append(vp) room_start_vp = vp else: exit_room = True front_path.append(vp) front_path.reverse() back_path.reverse() assert (front_path + back_path) == path, "Front path & Back path error" # front_path = ... room_start_vp # back_path = room_start_vp ... front_path += [room_start_vp] reach_length = np.sum([shortest_distances[a][b] for a, b in zip(front_path[:-1], front_path[1:])]) if len(front_path) != 1 else 0.01 explore_length = np.sum([shortest_distances[a][b] for a, b in zip(back_path[:-1], back_path[1:])]) if len(back_path) != 1 else 0.01 scores['room_spl'] = scores['room_success'] * gt_reach_length / max(reach_length, gt_reach_length, 0.01) if scores['found_success'] != 0.0: # fine-grained score # p is converage rate if gt_found: p = 1.0 else: explore_objs = set() for vp in back_path: explore_objs.update(vp2objs[vp]) p = len(explore_objs) / len(region2objs[scan][gt_stop_region]) scores['coverage_rate'] = p scores['explore_spl'] = scores['room_success'] * scores['found_success'] * gt_explore_length / max(gt_explore_length, explore_length, 0.01) * p else: scores['coverage_rate'] = 0 scores['explore_spl'] = 0 else: scores['room_spl'] = 0.0 scores['coverage_rate'] = 0 scores['explore_spl'] = 0 scores['spl'] = scores['success'] * gt_lengths / max(scores['trajectory_lengths'], gt_lengths, 0.01) ''' scores['sspl_1'] = scores['success'] * gt_lengths / max(scores['trajectory_lengths'], gt_lengths, 0.01) * scores['found_success'] scores['sspl_2'] = scores['room_success'] * gt_lengths / max(scores['trajectory_lengths'], gt_lengths, 0.01) * scores['found_success'] scores['sspl_3'] = scores['oracle_success'] * gt_lengths / max(scores['trajectory_lengths'], gt_lengths, 0.01) * scores['found_success'] scores['ss_1'] = scores['success'] * scores['found_success'] scores['ss_2'] = scores['room_success'] * scores['found_success'] scores['ss_3'] = scores['oracle_success'] * scores['found_success'] ''' scores['sspl'] = scores['spl'] * scores['found_success'] scores['rgs'] = str(pred_objid) == str(gt_objid) scores['rgspl'] = scores['rgs'] * gt_lengths / max(scores['trajectory_lengths'], gt_lengths, 0.01) return scores def eval_metrics(self, preds): ''' Evaluate each agent trajectory based on how close it got to the goal location the path contains [view_id, angle, vofv]''' print('eval %d predictions' % (len(preds))) print(preds[0]) metrics = defaultdict(list) for item in preds: instr_id = item['instr_id'] traj = item['trajectory'] pred_objid = item.get('pred_objid', None) scan, gt_traj, gt_objid = self.gt_trajs[instr_id] pred_found = item['found'] gt_found = item['gt_found'] traj_scores = self._eval_item(scan, traj, pred_objid, gt_traj, gt_objid, pred_found, gt_found) # record "success" in the result file # let the visualization tool can get the success status item['success'] = traj_scores['success'] for k, v in traj_scores.items(): metrics[k].append(v) metrics['instr_id'].append(instr_id) avg_metrics = { 'action_steps': np.mean(metrics['action_steps']), 'steps': np.mean(metrics['trajectory_steps']), 'lengths': np.mean(metrics['trajectory_lengths']), 'sr': np.mean(metrics['success']) * 100, 'oracle_sr': np.mean(metrics['oracle_success']) * 100, 'spl': np.mean(metrics['spl']) * 100, 'sspl': np.mean(metrics['sspl']) * 100, 'rgs': np.mean(metrics['rgs']) * 100, 'rgspl': np.mean(metrics['rgspl']) * 100, 'found_sr': np.mean(metrics['found_success']) * 100, 'room_sr': np.mean(metrics['room_success']) * 100, 'room_spl': np.mean(metrics['room_spl']) * 100, 'coverage_rate': np.mean(metrics['coverage_rate']) * 100, 'explore_spl': np.mean(metrics['explore_spl']) * 100, } return avg_metrics, metrics