-
Notifications
You must be signed in to change notification settings - Fork 242
Expand file tree
/
Copy pathutils.py
More file actions
139 lines (109 loc) · 4.49 KB
/
utils.py
File metadata and controls
139 lines (109 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import numpy as np
import time
import copy
############################################
############################################
def calculate_mean_prediction_error(env, action_sequence, models, data_statistics):
model = models[0]
# true
true_states = perform_actions(env, action_sequence)['observation']
# predicted
ob = np.expand_dims(true_states[0],0)
pred_states = []
for ac in action_sequence:
pred_states.append(ob)
action = np.expand_dims(ac,0)
ob = model.get_prediction(ob, action, data_statistics)
pred_states = np.squeeze(pred_states)
# mpe
mpe = mean_squared_error(pred_states, true_states)
return mpe, true_states, pred_states
def perform_actions(env, actions):
ob = env.reset()
obs, acs, rewards, next_obs, terminals, image_obs = [], [], [], [], [], []
steps = 0
for ac in actions:
obs.append(ob)
acs.append(ac)
ob, rew, done, _ = env.step(ac)
# add the observation after taking a step to next_obs
next_obs.append(ob)
rewards.append(rew)
steps += 1
# If the episode ended, the corresponding terminal value is 1
# otherwise, it is 0
if done:
terminals.append(1)
break
else:
terminals.append(0)
return Path(obs, image_obs, acs, rewards, next_obs, terminals)
def mean_squared_error(a, b):
return np.mean((a-b)**2)
############################################
############################################
def sample_trajectory(env, policy, max_path_length, render=False, render_mode=('rgb_array')):
# TODO: get this from Piazza
def sample_trajectories(env, policy, min_timesteps_per_batch, max_path_length, render=False, render_mode=('rgb_array')):
"""
Collect rollouts using policy
until we have collected min_timesteps_per_batch steps
"""
# TODO: get this from Piazza
return paths, timesteps_this_batch
def sample_n_trajectories(env, policy, ntraj, max_path_length, render=False, render_mode=('rgb_array')):
"""
Collect ntraj rollouts using policy
"""
# TODO: get this from Piazza
return paths
############################################
############################################
def Path(obs, image_obs, acs, rewards, next_obs, terminals):
"""
Take info (separate arrays) from a single rollout
and return it in a single dictionary
"""
if image_obs != []:
image_obs = np.stack(image_obs, axis=0)
return {"observation" : np.array(obs, dtype=np.float32),
"image_obs" : np.array(image_obs, dtype=np.uint8),
"reward" : np.array(rewards, dtype=np.float32),
"action" : np.array(acs, dtype=np.float32),
"next_observation": np.array(next_obs, dtype=np.float32),
"terminal": np.array(terminals, dtype=np.float32)}
def convert_listofrollouts(paths):
"""
Take a list of rollout dictionaries
and return separate arrays,
where each array is a concatenation of that array from across the rollouts
"""
observations = np.concatenate([path["observation"] for path in paths])
actions = np.concatenate([path["action"] for path in paths])
next_observations = np.concatenate([path["next_observation"] for path in paths])
terminals = np.concatenate([path["terminal"] for path in paths])
concatenated_rewards = np.concatenate([path["reward"] for path in paths])
unconcatenated_rewards = [path["reward"] for path in paths]
return observations, actions, next_observations, terminals, concatenated_rewards, unconcatenated_rewards
############################################
############################################
def get_pathlength(path):
return len(path["reward"])
def normalize(data, mean, std, eps=1e-8):
return (data-mean)/(std+eps)
def unnormalize(data, mean, std):
return data*std+mean
def add_noise(data_inp, noiseToSignal=0.01):
data = copy.deepcopy(data_inp) #(num data points, dim)
#mean of data
mean_data = np.mean(data, axis=0)
#if mean is 0,
#make it 0.001 to avoid 0 issues later for dividing by std
mean_data[mean_data == 0] = 0.000001
#width of normal distribution to sample noise from
#larger magnitude number = could have larger magnitude noise
std_of_noise = mean_data * noiseToSignal
for j in range(mean_data.shape[0]):
data[:, j] = np.copy(data[:, j] + np.random.normal(
0, np.absolute(std_of_noise[j]), (data.shape[0],)))
return data