393 lines
16 KiB
Python
393 lines
16 KiB
Python
''' Batched Room-to-Room navigation environment '''
|
|
|
|
import sys
|
|
|
|
from networkx.algorithms import swap
|
|
sys.path.append('buildpy36')
|
|
sys.path.append('Matterport_Simulator/build/')
|
|
import MatterSim
|
|
import csv
|
|
import numpy as np
|
|
import math
|
|
import base64
|
|
import utils
|
|
import json
|
|
import os
|
|
import random
|
|
import networkx as nx
|
|
from param import args
|
|
import time
|
|
|
|
from utils import load_datasets, load_nav_graphs, pad_instr_tokens
|
|
from IPython import embed
|
|
csv.field_size_limit(sys.maxsize)
|
|
|
|
|
|
class EnvBatch():
|
|
''' A simple wrapper for a batch of MatterSim environments,
|
|
using discretized viewpoints and pretrained features '''
|
|
|
|
def __init__(self, feature_store=None, batch_size=100):
|
|
"""
|
|
1. Load pretrained image feature
|
|
2. Init the Simulator.
|
|
:param feature_store: The name of file stored the feature.
|
|
:param batch_size: Used to create the simulator list.
|
|
"""
|
|
if feature_store:
|
|
if type(feature_store) is dict: # A silly way to avoid multiple reading
|
|
self.features = feature_store
|
|
self.image_w = 640
|
|
self.image_h = 480
|
|
self.vfov = 60
|
|
self.feature_size = next(iter(self.features.values())).shape[-1]
|
|
print('The feature size is %d' % self.feature_size)
|
|
else:
|
|
print(' Image features not provided - in testing mode')
|
|
self.features = None
|
|
self.image_w = 640
|
|
self.image_h = 480
|
|
self.vfov = 60
|
|
self.sims = []
|
|
for i in range(batch_size):
|
|
sim = MatterSim.Simulator()
|
|
sim.setRenderingEnabled(False)
|
|
sim.setDiscretizedViewingAngles(True) # Set increment/decrement to 30 degree. (otherwise by radians)
|
|
sim.setCameraResolution(self.image_w, self.image_h)
|
|
sim.setCameraVFOV(math.radians(self.vfov))
|
|
sim.initialize()
|
|
self.sims.append(sim)
|
|
|
|
def _make_id(self, scanId, viewpointId):
|
|
return scanId + '_' + viewpointId
|
|
|
|
def newEpisodes(self, scanIds, viewpointIds, headings):
|
|
for i, (scanId, viewpointId, heading) in enumerate(zip(scanIds, viewpointIds, headings)):
|
|
self.sims[i].newEpisode([scanId], [viewpointId], [heading], [0])
|
|
|
|
def getStates(self):
|
|
"""
|
|
Get list of states augmented with precomputed image features. rgb field will be empty.
|
|
Agent's current view [0-35] (set only when viewing angles are discretized)
|
|
[0-11] looking down, [12-23] looking at horizon, [24-35] looking up
|
|
:return: [ ((30, 2048), sim_state) ] * batch_size
|
|
"""
|
|
feature_states = []
|
|
for i, sim in enumerate(self.sims):
|
|
state = sim.getState()[0]
|
|
|
|
long_id = self._make_id(state.scanId, state.location.viewpointId)
|
|
if self.features:
|
|
feature = self.features[long_id]
|
|
feature_states.append((feature, state))
|
|
else:
|
|
feature_states.append((None, state))
|
|
return feature_states
|
|
|
|
def makeActions(self, actions):
|
|
''' Take an action using the full state dependent action interface (with batched input).
|
|
Every action element should be an (index, heading, elevation) tuple. '''
|
|
for i, (index, heading, elevation) in enumerate(actions):
|
|
self.sims[i].makeAction(index, heading, elevation)
|
|
|
|
|
|
class R2RBatch():
|
|
''' Implements the Room to Room navigation task, using discretized viewpoints and pretrained features '''
|
|
|
|
def __init__(self, feature_store, batch_size=100, seed=10, splits=['train'], tokenizer=None,
|
|
name=None):
|
|
self.env = EnvBatch(feature_store=feature_store, batch_size=batch_size)
|
|
if feature_store:
|
|
self.feature_size = self.env.feature_size
|
|
else:
|
|
self.feature_size = 2048
|
|
self.data = []
|
|
if tokenizer:
|
|
self.tok = tokenizer
|
|
scans = []
|
|
for split in splits:
|
|
max_len = 0
|
|
for i_item, item in enumerate(load_datasets([split])):
|
|
max_len = i_item
|
|
# if args.test_only and i_item == 64:
|
|
# break
|
|
if "/" in split:
|
|
try:
|
|
new_item = dict(item)
|
|
new_item['instr_id'] = item['path_id']
|
|
new_item['instructions'] = item['instructions'][0]
|
|
new_item['instr_encoding'] = item['instr_enc']
|
|
if new_item['instr_encoding'] is not None: # Filter the wrong data
|
|
self.data.append(new_item)
|
|
scans.append(item['scan'])
|
|
except:
|
|
continue
|
|
else:
|
|
# Split multiple instructions into separate entries
|
|
# print("HERE")
|
|
for j, instr in enumerate(item['instructions']):
|
|
try:
|
|
new_item = dict(item)
|
|
new_item['instr_id'] = '%s_%d' % (item['path_id'], j)
|
|
new_item['instructions'] = instr
|
|
new_item['swap'] = item['swap'][j]
|
|
|
|
''' BERT tokenizer '''
|
|
instr_tokens = tokenizer.tokenize(instr)
|
|
padded_instr_tokens, num_words = pad_instr_tokens(instr_tokens, args.maxInput)
|
|
new_item['instr_encoding'] = tokenizer.convert_tokens_to_ids(padded_instr_tokens)
|
|
|
|
if new_item['instr_encoding'] is not None: # Filter the wrong data
|
|
self.data.append(new_item)
|
|
scans.append(item['scan'])
|
|
|
|
except:
|
|
continue
|
|
print("split {} has {} datas in the file.".format(split, max_len))
|
|
|
|
|
|
if name is None:
|
|
self.name = splits[0] if len(splits) > 0 else "FAKE"
|
|
else:
|
|
self.name = name
|
|
|
|
self.scans = set(scans)
|
|
self.splits = splits
|
|
self.seed = seed
|
|
random.seed(self.seed)
|
|
random.shuffle(self.data)
|
|
|
|
self.ix = 0
|
|
self.batch_size = batch_size
|
|
self._load_nav_graphs()
|
|
|
|
self.angle_feature = utils.get_all_point_angle_feature()
|
|
self.sim = utils.new_simulator()
|
|
self.buffered_state_dict = {}
|
|
|
|
# It means that the fake data is equals to data in the supervised setup
|
|
self.fake_data = self.data
|
|
print('R2RBatch loaded with %d instructions, using splits: %s' % (len(self.data), ",".join(splits)))
|
|
|
|
def size(self):
|
|
return len(self.data)
|
|
|
|
def _load_nav_graphs(self):
|
|
"""
|
|
load graph from self.scan,
|
|
Store the graph {scan_id: graph} in self.graphs
|
|
Store the shortest path {scan_id: {view_id_x: {view_id_y: [path]} } } in self.paths
|
|
Store the distances in self.distances. (Structure see above)
|
|
Load connectivity graph for each scan, useful for reasoning about shortest paths
|
|
:return: None
|
|
"""
|
|
print('Loading navigation graphs for %d scans' % len(self.scans))
|
|
self.graphs = load_nav_graphs(self.scans)
|
|
self.paths = {}
|
|
for scan, G in self.graphs.items(): # compute all shortest paths
|
|
self.paths[scan] = dict(nx.all_pairs_dijkstra_path(G))
|
|
self.distances = {}
|
|
for scan, G in self.graphs.items(): # compute all shortest paths
|
|
self.distances[scan] = dict(nx.all_pairs_dijkstra_path_length(G))
|
|
|
|
def _next_minibatch(self, tile_one=False, batch_size=None, **kwargs):
|
|
"""
|
|
Store the minibach in 'self.batch'
|
|
:param tile_one: Tile the one into batch_size
|
|
:return: None
|
|
"""
|
|
if batch_size is None:
|
|
batch_size = self.batch_size
|
|
if tile_one:
|
|
batch = [self.data[self.ix]] * batch_size
|
|
self.ix += 1
|
|
if self.ix >= len(self.data):
|
|
random.shuffle(self.data)
|
|
self.ix -= len(self.data)
|
|
else:
|
|
batch = self.data[self.ix: self.ix+batch_size]
|
|
if len(batch) < batch_size:
|
|
random.shuffle(self.data)
|
|
self.ix = batch_size - len(batch)
|
|
batch += self.data[:self.ix]
|
|
else:
|
|
self.ix += batch_size
|
|
self.batch = batch
|
|
|
|
def reset_epoch(self, shuffle=False):
|
|
''' Reset the data index to beginning of epoch. Primarily for testing.
|
|
You must still call reset() for a new episode. '''
|
|
if shuffle:
|
|
random.shuffle(self.data)
|
|
self.ix = 0
|
|
|
|
def _shortest_path_action(self, state, goalViewpointId):
|
|
''' Determine next action on the shortest path to goal, for supervised training. '''
|
|
if state.location.viewpointId == goalViewpointId:
|
|
return goalViewpointId # Just stop here
|
|
path = self.paths[state.scanId][state.location.viewpointId][goalViewpointId]
|
|
nextViewpointId = path[1]
|
|
return nextViewpointId
|
|
|
|
def make_candidate(self, feature, scanId, viewpointId, viewId):
|
|
def _loc_distance(loc):
|
|
return np.sqrt(loc.rel_heading ** 2 + loc.rel_elevation ** 2)
|
|
|
|
# viewId 就是 view index
|
|
base_heading = (viewId % 12) * math.radians(30)
|
|
adj_dict = {}
|
|
long_id = "%s_%s" % (scanId, viewpointId)
|
|
|
|
|
|
if long_id not in self.buffered_state_dict:
|
|
|
|
# 36 view index
|
|
for ix in range(36):
|
|
if ix == 0:
|
|
self.sim.newEpisode([scanId], [viewpointId], [0], [math.radians(-30)])
|
|
elif ix % 12 == 0:
|
|
self.sim.makeAction([0], [1.0], [1.0])
|
|
else:
|
|
self.sim.makeAction([0], [1.0], [0])
|
|
|
|
state = self.sim.getState()[0]
|
|
assert state.viewIndex == ix
|
|
|
|
# Heading and elevation for the viewpoint center
|
|
heading = state.heading - base_heading
|
|
elevation = state.elevation
|
|
|
|
# feature 是 np.zeros((36, 2048))
|
|
visual_feat = feature[ix]
|
|
# (2048)
|
|
|
|
# get adjacent locations
|
|
for j, loc in enumerate(state.navigableLocations[1:]):
|
|
# if a loc is visible from multiple view, use the closest
|
|
# view (in angular distance) as its representation
|
|
distance = _loc_distance(loc)
|
|
|
|
# Heading and elevation for for the loc
|
|
loc_heading = heading + loc.rel_heading
|
|
loc_elevation = elevation + loc.rel_elevation
|
|
angle_feat = utils.angle_feature(loc_heading, loc_elevation)
|
|
if (loc.viewpointId not in adj_dict or
|
|
distance < adj_dict[loc.viewpointId]['distance']):
|
|
adj_dict[loc.viewpointId] = {
|
|
'heading': loc_heading,
|
|
'elevation': loc_elevation,
|
|
"normalized_heading": state.heading + loc.rel_heading,
|
|
'scanId':scanId,
|
|
'viewpointId': loc.viewpointId, # Next viewpoint id
|
|
'pointId': ix,
|
|
'distance': distance,
|
|
'idx': j + 1,
|
|
'feature': np.concatenate((visual_feat, angle_feat), -1)
|
|
}
|
|
candidate = list(adj_dict.values())
|
|
|
|
# 放 buffer
|
|
self.buffered_state_dict[long_id] = [
|
|
{key: c[key]
|
|
for key in
|
|
['normalized_heading', 'elevation', 'scanId', 'viewpointId',
|
|
'pointId', 'idx']}
|
|
for c in candidate
|
|
]
|
|
return candidate
|
|
else:
|
|
candidate = self.buffered_state_dict[long_id]
|
|
candidate_new = []
|
|
for c in candidate:
|
|
c_new = c.copy()
|
|
ix = c_new['pointId']
|
|
normalized_heading = c_new['normalized_heading']
|
|
visual_feat = feature[ix]
|
|
loc_heading = normalized_heading - base_heading
|
|
c_new['heading'] = loc_heading
|
|
angle_feat = utils.angle_feature(c_new['heading'], c_new['elevation'])
|
|
c_new['feature'] = np.concatenate((visual_feat, angle_feat), -1)
|
|
c_new.pop('normalized_heading')
|
|
candidate_new.append(c_new)
|
|
return candidate_new
|
|
|
|
def _get_obs(self):
|
|
obs = []
|
|
|
|
# self.env is `EnvBatch`
|
|
# [ ((30, 2048), sim_state) ] * batch_size
|
|
for i, (feature, state) in enumerate(self.env.getStates()):
|
|
|
|
# self.batch 看不懂
|
|
item = self.batch[i]
|
|
|
|
# now viewpoint index
|
|
base_view_id = state.viewIndex
|
|
|
|
if feature is None:
|
|
feature = np.zeros((36, 2048))
|
|
|
|
# Full features
|
|
# candidate 就是 navigable viewpoint
|
|
candidate = self.make_candidate(feature, state.scanId, state.location.viewpointId, state.viewIndex)
|
|
|
|
# [visual_feature, angle_feature] for views
|
|
feature = np.concatenate((feature, self.angle_feature[base_view_id]), -1)
|
|
|
|
obs.append({
|
|
'instr_id' : item['instr_id'],
|
|
'scan' : state.scanId,
|
|
'viewpoint' : state.location.viewpointId,
|
|
'viewIndex' : state.viewIndex,
|
|
'heading' : state.heading,
|
|
'elevation' : state.elevation,
|
|
'feature' : feature,
|
|
'candidate': candidate,
|
|
'navigableLocations' : state.navigableLocations,
|
|
'instructions' : item['instructions'],
|
|
'teacher' : self._shortest_path_action(state, item['path'][-1]),
|
|
'gt_path' : item['path'],
|
|
'path_id' : item['path_id'],
|
|
'swap': item['swap']
|
|
})
|
|
if 'instr_encoding' in item:
|
|
obs[-1]['instr_encoding'] = item['instr_encoding']
|
|
# A2C reward. The negative distance between the state and the final state
|
|
obs[-1]['distance'] = self.distances[state.scanId][state.location.viewpointId][item['path'][-1]]
|
|
return obs
|
|
|
|
def reset(self, batch=None, inject=False, **kwargs):
|
|
''' Load a new minibatch / episodes. '''
|
|
|
|
if batch is None: # Allow the user to explicitly define the batch
|
|
self._next_minibatch(**kwargs)
|
|
else:
|
|
if inject: # Inject the batch into the next minibatch
|
|
self._next_minibatch(**kwargs)
|
|
self.batch[:len(batch)] = batch
|
|
else: # Else set the batch to the current batch
|
|
self.batch = batch
|
|
scanIds = [item['scan'] for item in self.batch]
|
|
viewpointIds = [item['path'][0] for item in self.batch]
|
|
headings = [item['heading'] for item in self.batch]
|
|
|
|
# self.env is `EnvBatch`
|
|
self.env.newEpisodes(scanIds, viewpointIds, headings)
|
|
return self._get_obs()
|
|
|
|
def step(self, actions):
|
|
''' Take action (same interface as makeActions) '''
|
|
self.env.makeActions(actions)
|
|
return self._get_obs()
|
|
|
|
def get_statistics(self):
|
|
stats = {}
|
|
length = 0
|
|
path = 0
|
|
for datum in self.data:
|
|
length += len(self.tok.split_sentence(datum['instructions']))
|
|
path += self.distances[datum['scan']][datum['path'][0]][datum['path'][-1]]
|
|
stats['length'] = length / len(self.data)
|
|
stats['path'] = path / len(self.data)
|
|
return stats
|