-
Notifications
You must be signed in to change notification settings - Fork 0
/
gaussian_spectral_clustering.py
231 lines (181 loc) · 11.3 KB
/
gaussian_spectral_clustering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import numpy as np
from scipy.stats import multivariate_normal
from utils import random_choice_prob_index, rx
def compute_pcs_and_project(data_mat):
"""
Implements the "Compute PCS and Project' block from the Beaven paper
Uses SVD to compute the PCA representation of the given data
:param data_mat: Input data to project into PCA coordinates
:return: Tuple containing the projected data and the eigenvectors used
"""
# Step 1: Compute covariance matrix
gamma = np.cov(data_mat)
# Step 2: Compute the svd
U, S, Vh = np.linalg.svd(gamma)
# Step 3: Retain the eigenvectors of the covariance
eig_vecs = U.copy()
# Step 4: Project the demeaned hsi_data_mat onto U
mean_vec = np.mean(data_mat, axis=1)
x_pca = -U.transpose().dot((data_mat.transpose() - mean_vec).transpose())
return x_pca, eig_vecs
def compute_multivariate_gaussian_statistics(data_mat, threshold=None):
"""
Implements the 'Compute MV Gaussian Statistics' block from the Beaven paper
Uses the multivariate stats to do RX anomaly detection for culling pixels
:param data_mat: (num_features, num_samples) data matrix (PC bands N+1-end, trailing order PCs)
:param threshold: Optional threshold value for RX algorithm for classifying outliers. Default is to classify
anything greater than 2 std. deviations away from the mean as an outlier
:return: Tuple containing indices of the outliers and valid data in the data_mat
"""
rx_vec = rx(data_mat)
if threshold is None:
# Define outlier threshold as anything greater than 2 std. deviations away from the mean
threshold = rx_vec.mean() + 2 * rx_vec.std()
outlier_ixs = np.nonzero(rx_vec > threshold)[0]
valid_ixs = np.nonzero(rx_vec <= threshold)[0]
return outlier_ixs, valid_ixs
def initial_class_assignment(data_mat, num_classes, method='rand', init_indices=None):
"""
Implements the 'Initial Class Assignment' block from the Beaven paper
Either randomly assigns all samples to classes, or takes in a spectral vector for each class to use as a starting
point for class means.
:param data_mat: (num_features, num_samples) data matrix (PC bands 1-N, leading order PCs)
:param num_classes: Number of classes to cluster into
:param method: ('rand', 'select') String specifying which initialization method to choose
:param init_indices: (num_classes,) List-type containing one sample index into data_mat per class to use
with the 'select' initialization method
:return: ((num_samples,), (num_features, num_samples), (num_features, num_features, num_samples)) Tuple containing
Numpy arrays for initial class membership indices (ranging from 0 - num_classes-1), class means, and class
covariances, respectively
"""
num_features, num_samples = data_mat.shape
class_mean = np.zeros([num_features, num_classes])
class_cov = np.zeros([num_features, num_features, num_classes])
if method == 'rand':
# Option 1 - Random assignment
cluster_member_ix = np.random.randint(0, high=num_classes, size=num_samples)
# For each class, find all members of that class and calculate a class mean and covariance
for class_idx in range(num_classes):
ixs = np.nonzero(cluster_member_ix == class_idx)[0]
class_mean[:, class_idx] = data_mat[:, ixs].mean(axis=1)
class_cov[:, :, class_idx] = np.cov(data_mat[:, ixs])
elif method == 'select' and np.array(init_indices).shape[0] == num_classes:
# Option 2 - Selective sampling
# Use the given indices to initialize the class means
for class_idx in range(num_classes):
class_mean[:, class_idx] = data_mat[:, init_indices[class_idx]]
# Calculate global covariance and use that as starting point for each class
# Create num_classes copies of the covariance matrix along the third dimension
# TODO: Check this
class_cov = np.dstack([np.cov(data_mat)] * num_classes)
# TODO: How do I actually initially assign samples in this case? For now, just doing it randomly
cluster_member_ix = np.random.randint(0, high=num_classes, size=num_samples)
else:
print('Invalid method given - did you pick "select" with invalid init_indices param?')
raise ValueError
return cluster_member_ix, class_mean, class_cov
def compute_class_statistics(data_mat, class_ixs, num_classes):
"""
Implements the 'Compute Class Statistics' block from the Beaven paper
Computes class means, covariances, and prior probabilities.
:param data_mat: (num_features, num_samples) data matrix (PC bands 1-N, leading order PCs)
:param class_ixs: (num_samples) Numpy array containing class membership indices
:param num_classes: Number of classes to cluster into
:return: ((num_features, num_samples), (num_features, num_features, num_samples), (num_classes) Tuple containing
calculated class means, covariances, and prior probabilities, respectively
"""
num_features, num_samples = data_mat.shape
class_mean = np.zeros([num_features, num_classes])
class_cov = np.zeros([num_features, num_features, num_classes])
for class_idx in range(num_classes):
ixs = np.nonzero(class_ixs == class_idx)[0]
class_mean[:, class_idx] = data_mat[:, ixs].mean(axis=1)
class_cov[:, :, class_idx] = np.cov(data_mat[:, ixs])
class_priors = np.bincount(class_ixs)
class_priors = class_priors / np.sum(class_priors)
return class_mean, class_cov, class_priors
def compute_posterior_probability_and_assign(data_mat, class_ixs, class_mean, class_cov, class_priors,
dead_class_threshold=0):
"""
Implements the 'Compute Posterior Probability & Assign to Class' block from the Beaven paper
:param data_mat: (num_features,num_samples) data matrix (PC bands 1-N, leading order PCs)
:param class_ixs: (num_samples,) array containing prior class membership indices
:param class_mean: (num_features, num_classes) numpy matrix containing prior class means
:param class_cov: (num_features, num_features, num_classes) numpy matrix containing prior class covariance matrices
:param class_priors: (num_classes,) array containing prior class probabilities
:param dead_class_threshold: Integer specifying the number of minimum members a class may have before it is
considered dead and adaptive class management takes over. Default = 0 = no adaptive
management occurs (may have issues with singular matrices with this default!)
:return: updated_class_ixs: (num_samples,) array containing updated class indices
"""
K, num_samples = data_mat.shape
num_classes = class_mean.shape[1]
posterior_probabilities = np.zeros([num_samples, num_classes])
p_c = np.zeros([num_samples, num_classes])
for class_idx in range(num_classes):
m_c = class_mean[:, class_idx]
K_c = class_cov[:, :, class_idx]
# Scipy built-in multivariate_normal function, faster and less error prone that manual implementation
p_c[:, class_idx] = multivariate_normal.pdf(data_mat.transpose(), mean=m_c, cov=K_c.transpose())
posterior_probabilities[:, class_idx] = class_priors[class_idx] * p_c[:, class_idx]
# Divide each class posterior probability by the sum of all
posterior_probabilities = posterior_probabilities / np.sum(posterior_probabilities, axis=1, keepdims=True)
# Stochastic assignment of classes
updated_class_ixs = random_choice_prob_index(posterior_probabilities).astype('int64')
# Get number of samples in each class
class_counts = np.bincount(updated_class_ixs)
print('class counts', class_counts)
# If any class is less than a certain threshold, consider it "dead" and perform adaptive class management
dead_class_ixs = np.nonzero(class_counts[class_counts < dead_class_threshold])[0]
for dead_class in dead_class_ixs:
print('Dead class (idx {}), less members than threshold ({})'.format(dead_class, dead_class_threshold))
updated_class_ixs = adaptive_class_management(data_mat, updated_class_ixs, dead_class, num_classes)
return updated_class_ixs
def adaptive_class_management(data_mat, class_ixs, dead_class_ix, num_classes):
"""
Performs adaptive class management as described in the Beaven paper
:param data_mat: (num_features, num_samples) data matrix (PC bands 1-N, leading order PCs)
:param class_ixs: (num_samples,) array containing class membership indices
:param dead_class_ix: Index of the empty class that is to be removed
:param num_classes: Number of total possible cluster classes
:return: updated_class_ixs: (num_samples,) array containing class indices after adaptive class management
"""
# Step 1 - Nominate a dominant class
class_vars = np.zeros(num_classes)
# Find the class with the most scatter (variance?) in the first principal component dimension
for class_idx in range(num_classes):
ixs = np.nonzero(class_ixs == class_idx)[0]
class_vars[class_idx] = np.var(data_mat[0, ixs])
dominant_class_ix = np.argmax(class_vars)
dominant_class_ixs = np.nonzero(class_ixs == dominant_class_ix)[0]
# Step 2 - Split dominant class into 2 subclasses
tmp = data_mat[0, dominant_class_ixs] - np.mean(data_mat[0, dominant_class_ixs])
# Data that has a first principal component value less than the class mean will be transferred to the dead class
neg_ixs = np.nonzero(tmp[tmp < 0])
class_ixs[neg_ixs] = dead_class_ix
# Step 3 - Continue iterating with the new class indices
return class_ixs
def iterate_clustering(data_mat, class_ixs, num_classes, N=100, dead_class_threshold=0):
"""
Iterates through the Compute Class Statistics and Compute Posterior Probability & Assign to Class blocks
:param data_mat: (num_features, num_samples) data matrix (PC bands 1-N, leading order PCs)
:param class_ixs: (num_samples,) array containing class membership indices
:param num_classes: Number of classes to cluster into
:param N: Number of iterations
:param dead_class_threshold: Integer specifying the number of minimum members a class may have before it is
considered dead and adaptive class management takes over. Default = 0 = no adaptive
management occurs (may have issues with singular matrices with this default!)
:return: (num_samples,) Numpy array containing final class membership indices
"""
updated_class_ixs = class_ixs
for iter_idx in range(N):
# TODO: Need to return/save off the posterior probabilities and make them priors?
class_mean, class_cov, class_priors = compute_class_statistics(data_mat, updated_class_ixs, num_classes)
updated_class_ixs = compute_posterior_probability_and_assign(data_mat,
class_ixs,
class_mean,
class_cov,
class_priors,
dead_class_threshold)
print('Finished iteration #', iter_idx)
return updated_class_ixs