-
Notifications
You must be signed in to change notification settings - Fork 0
/
Evalutation.py
253 lines (199 loc) · 9.07 KB
/
Evalutation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
from enum import Enum
from random import sample
from typing import Dict
import numpy as np
from matplotlib import pyplot as plt
from pandas import DataFrame
from scipy.stats import stats
from tqdm import tqdm
from Graph_class import Graph
from Metrics.Coccurrences_class import Co_occurrencesGraph
from Metrics.Word2Vec_class import Word2VecGraph
from NodeRank import GraphRanker
from Utils import is_valid_relation, normalize_meshId
def test_DDAs(graph: Graph,
top_z: int,
test_set: Dict,
max_diseases: int) -> float:
"""
Given the knowledge graph, the method tests it on test_set
:param graph: Knowledge graph
:param top_z: The top-z drug discovered by the algorthm
:param test_set: Benchmark to test
:param max_diseases: max disease to take into consideration
:return: precision
"""
i_diseases = [i_node for _, i_node in GraphRanker(graph).rank_nodes() if graph.index_to_info[i_node]['source']][
:max_diseases] # intersection
matched = 0
# For all diseases that are contained in both knowledge graph and test set
for i_disease in tqdm(i_diseases, desc="test_drug_disease"):
# return the top-top_z drugs
nearest_neighbors = graph.find_nearest(i_disease, predicate=lambda x: x['obj'] == 'drug', max_=top_z)
for i_neighbor, _, _ in nearest_neighbors:
if is_valid_relation(disease_id=graph.index_to_info[i_disease]['id'],
drug_id=graph.index_to_info[i_neighbor]['id'],
test_set=test_set):
matched += 1
break
return matched / len(i_diseases)
def find_diseases_precisionz(graph: Graph,
top_z: int,
test_set: Dict,
max_diseases: int) -> list:
"""
Retrieve the precision@z for each disease
:param graph: Knowledge graph
:param top_z: the top-z drug discovered by the algorthm
:param test_set: Benchmark to test
:param max_diseases: max disease to take into consideration
:return: the List of precision@z portion for each disease
"""
i_diseases = [i_node for _, i_node in GraphRanker(graph).rank_nodes() if graph.index_to_info[i_node]['source']][
:max_diseases] # intersection
precisionz = []
# For all diseases that are contained in both knowledge graph and test set
for i_disease in tqdm(i_diseases, desc="test_drug_disease"):
# return the top-z drugs
nearest_neighbors = graph.find_nearest(i_disease, predicate=lambda x: x['obj'] == 'drug', max_=top_z)
matched = 0
for i_neighbor, _, _ in nearest_neighbors:
if is_valid_relation(disease_id=graph.index_to_info[i_disease]['id'],
drug_id=graph.index_to_info[i_neighbor]['id'],
test_set=test_set):
matched += 1
precisionz.append(matched / len(nearest_neighbors))
return precisionz
graphType = Enum('GraphType', ['COOCCURRENCES', 'WORD2VEC'])
def create_knowledge_graph(config: tuple,
df_entities: DataFrame,
texts) -> Graph:
"""
Given the configurations, the method builds the Knowledge graph
:param config: Configuration of knowledge graph composed by graph type and hyperparams
:param df_entities: Dataframe of entities
:param texts: Dataframe of texts for each pubmed id
:return: Knowledge graph
"""
graph_type, kargs = config
graph = None
if graph_type.value == graphType.COOCCURRENCES.value:
graph = Co_occurrencesGraph(df_entities)
elif graph_type.value == graphType.WORD2VEC.value:
graph = Word2VecGraph(df_entities, texts)
if graph is not None:
graph.populate_adj_matrix(**kargs)
return graph
def model_evaluation(configs,
ts_set: Dict,
texts,
top_z: int,
df_entities: DataFrame,
max_diseases: int = 100) -> list:
"""
Given a set of configuration to try, the method finds the best one
:param configs: configuration to try
:param ts_set: Benchmark to test
:param top_z: The top-z drug discovered by the algorthm
:param texts: Dataframe of texts for each pubmed id
:param df_entities: Sources information (documents)
:param max_diseases: max disease to take into consideration
:return: a sorted list of models
"""
ranking = []
# Iterate all configuration provided
for i, config in enumerate(tqdm(configs, desc="gridsearch")):
print(f'Config {i + 1} / {len(configs)}: {config}')
# Build the knowledge graph
graph = create_knowledge_graph(config, df_entities, texts)
# Testing the knowledge graph on test_set
precision = test_DDAs(graph=graph,
top_z=top_z,
test_set=ts_set,
max_diseases=max_diseases)
print(f'Precision {precision} for config {config}')
ranking.append((precision, config))
# We sort the results by precision
ranking.sort(key=lambda x: x[0], reverse=True)
return ranking
def plot_hist(x, y):
plt.figure(figsize=(8, 4))
ax = plt.axes()
ax.set_facecolor("#ececec")
ax.set_axisbelow(True)
ax.yaxis.grid(color='white')
ax.xaxis.grid(color='white')
plt.bar(x, y, color="#2985e5", edgecolor="black")
plt.xticks(range(len(x)), [str(i * 10) + "%" for i in x])
plt.xlabel("Precision")
plt.ylabel("Num of disease")
plt.title("Precision@Z for the first 100 most important diseases in the KG")
plt.show()
def sample_disease2drugs(ddas: dict,
disease: str,
len_sampling: int) -> list[str]:
"""
Given a DDAs dictionary sample k drugs random
:param ddas: Disease-Drug Association provided by test set
:param disease: A given disease
:param len_sampling: length of sampling
:return: List of normalized id of drugs sampled
"""
drug_ids = ddas[normalize_meshId(disease)]
samples = sample(drug_ids, len_sampling) if len_sampling < len(drug_ids) else drug_ids
return [normalize_meshId(id) for id in samples]
def sample_drugs(graph: Graph, len_sampling: int):
"""
Given the Knowledge graph, return k drugs random
:param graph: Knowledge graph
:param len_sampling: length of sampling
:return: List of normalized id of drugs sampled
"""
drug_ids = [normalize_meshId(id) for id, info in graph.id_to_info.items() if info['obj'] == 'drug']
return sample(drug_ids, len_sampling) if len_sampling < len(drug_ids) else drug_ids
def pValues_DDAs(graph: Graph,
ddas: Dict,
max_diseases: int = None,
n_samples: int = 15) -> list[float]:
"""
Function able to retrieve the list of p-values of first disease in the page rank that are in the
intersection between knowledge graph and test set
:param graph: Knowledge graph
:param ddas: Disease-Drug Association provided by test set
:param max_diseases: max disease to take into consideration
:param n_samples: the Number of drugs to sample
:return: P-value for each disease
"""
# Make the intersection between Test and graph (disease)
i_diseases = [i_node for _, i_node in GraphRanker(graph).rank_nodes() if graph.index_to_info[i_node]['source']]
i_diseases = i_diseases[:max_diseases]
pValues = []
# Foreach disease
for i_disease in tqdm(i_diseases, desc="find_pValues_drug_disease"):
# Sample random drugs by test_set (ddas dictionary) for a given disease
sample_true = sample_disease2drugs(disease=normalize_meshId(graph.node_ids[i_disease]),
ddas=ddas,
len_sampling=n_samples)
# Sample random drugs by Knowledge graph
sample_random = sample_drugs(graph=graph,
len_sampling=n_samples)
storage = set()
def stop_condition(x):
norm_id = normalize_meshId(x['id'])
if norm_id in sample_true or norm_id in sample_random:
storage.add(norm_id)
return len(storage) >= len(sample_true) + len(sample_random)
nearest_neighbors = graph.find_nearest(i_disease,
predicate=lambda x: x['obj'] == 'drug',
max_=None,
stop_condition=stop_condition)
scores_true, scores_random = [], []
for rank, (i_neighbor, _, _) in enumerate(nearest_neighbors):
id_node = normalize_meshId(graph.node_ids[i_neighbor])
if id_node in sample_true:
scores_true.append(rank + 1)
if id_node in sample_random:
scores_random.append(rank + 1)
pValue = stats.ttest_ind(scores_true, scores_random)[1]
pValues.append(0 if np.isnan(pValue) else pValue)
return pValues