001package com.astrolabsoftware.FinkBrowser.Januser; 002 003import com.Lomikel.Utils.SmallHttpClient; 004import com.Lomikel.Utils.NotifierURL; 005import com.Lomikel.Utils.MapUtil; 006import com.Lomikel.Utils.Pair; 007import com.Lomikel.Utils.LomikelException; 008import com.Lomikel.HBaser.HBaseClient; 009import com.Lomikel.Januser.GremlinRecipies; 010import com.Lomikel.Januser.ModifyingGremlinClient; 011import com.astrolabsoftware.FinkBrowser.HBaser.FinkHBaseClient; 012import com.astrolabsoftware.FinkBrowser.FinkPortalClient.FPC; 013 014// Tinker Pop 015import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; 016import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; 017import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; 018import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.otherV; 019import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.V; 020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.fold; 021import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.has; 022import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.not; 023import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 024import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; 025import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.repeat; 026import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.values; 027import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count; 028import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; 029import static org.apache.tinkerpop.gremlin.process.traversal.P.within; 030import org.apache.tinkerpop.gremlin.structure.Edge; 031import org.apache.tinkerpop.gremlin.structure.Vertex; 032import org.apache.tinkerpop.gremlin.structure.Property; 033import org.apache.tinkerpop.gremlin.structure.VertexProperty; 034import org.apache.tinkerpop.gremlin.structure.Direction; 035import org.apache.tinkerpop.gremlin.structure.Graph; 036 037// Janus Graph 038import org.janusgraph.core.SchemaViolationException; 039import org.janusgraph.graphdb.vertices.StandardVertex; 040import org.janusgraph.graphdb.database.StandardJanusGraph; 041 042// HBase 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Get; 045 046// org.json 047import org.json.JSONArray; 048import org.json.JSONObject; 049 050// Java Mail 051import javax.mail.MessagingException; 052 053// Java 054import java.util.Arrays; 055import java.util.stream.Collectors; 056import java.util.Iterator; 057import java.util.ArrayList; 058import java.util.List; 059import java.util.Map; 060import java.util.HashMap; 061import java.util.LinkedHashMap; 062import java.util.TreeMap; 063import java.util.SortedSet; 064import java.util.TreeSet; 065import java.util.Set; 066import java.util.HashSet; 067import java.util.Iterator; 068import java.util.Optional; 069import java.util.Calendar; 070import java.util.Date; 071import java.text.SimpleDateFormat; 072import java.util.NoSuchElementException; 073 074// Log4J 075import org.apache.logging.log4j.Logger; 076import org.apache.logging.log4j.LogManager; 077 078/** <code>FinkGremlinRecipies</code> provides various recipies to handle 079 * and modify Gremlin Graphs for Fink. 080 * @opt attributes 081 * @opt operations 082 * @opt types 083 * @opt visibility 084 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 085// TBD: check precodition for methods, wgich doesn't work with 'client' creation 086public class FinkGremlinRecipies extends GremlinRecipies { 087 088 /** Create and attach to {@link GraphTraversalSource}. 089 * @param g The attached {@link GraphTraversalSource}. */ 090 public FinkGremlinRecipies(GraphTraversalSource g) { 091 super(g); 092 } 093 094 /** Create and attach to {@link ModifyingGremlinClient}. 095 * @param client The attached {@link ModifyingGremlinClient}. */ 096 public FinkGremlinRecipies(ModifyingGremlinClient client) { 097 super(client); 098 } 099 100 /** Execute full chain of new sources correlations analyses. 101 * @param classifiers The {@link Classifier}s to be used. 102 * They can contain the {@link Classifier} flavor after <em>=</em> symbol. 103 * @param filter The HBase evaluation formula to be applied. 104 * Ignored if <tt>clss</tt> are specified. 105 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 106 * @param nLimit The maximal number of alerts getting from HBase or Fink Portal. 107 * <tt>0</tt> means no limit. 108 * @param timeLimit How far into the past the search should search (in minutes). 109 * @param clss An array of <em>classes</em> taken from {@link FPC}, 110 * if contains <tt>Anomaly</tt>, get anomalies from {@link FPC}, 111 * if <tt>null</tt>, analyse <em>sources</em> from HBase database. 112 * @throws LomikelException If anything fails. */ 113 public void processSoI(Classifier[] classifiers, 114 String filter, 115 String hbaseUrl, 116 int nLimit, 117 int timeLimit, 118 String[] clss) throws LomikelException { 119 fillSoI(classifiers, filter, hbaseUrl, nLimit, timeLimit, clss); 120 generateCorrelations(classifiers); 121 } 122 123 /** Fill graph with <em>SoI</em>. 124 * @param classifiers The {@link Classifier}s to be used. 125 * @param filter The HBase evaluation formula to be applied. 126 * Ignored if <tt>clss</tt> are specified. 127 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 128 * @param nLimit The maximal number of alerts getting from HBase or Fink Portal. 129 * <tt>0</tt> means no limit. 130 * @param timeLimit How far into the past the search should search (in minutes). 131 * @param clss An array of <em>classes</em> taken from {@link FPC}, 132 * if contains <tt>Anomaly</tt>, get anomalies from {@link FPC}, 133 * if <tt>null</tt>, analyse <em>sources</em> from HBase database. 134 135 * @throws LomikelException If anything fails. */ 136 public void fillSoI(Classifier[] classifiers, 137 String filter, 138 String hbaseUrl, 139 int nLimit, 140 int timeLimit, 141 String[] clss) throws LomikelException { 142 String clssDesc = ""; 143 if (clss != null) { 144 clssDesc = "of " + Arrays.toString(clss); 145 } 146 log.info("Filling SoI " + clssDesc + " using " + Arrays.toString(classifiers) + " classifiers, nLimit = " + nLimit + ", timeLimit = " + timeLimit); 147 log.info("Importing from " + hbaseUrl + ":"); 148 fhclient(hbaseUrl); 149 Set<String> oids = new HashSet<>();; 150 if (clss == null) { 151 fhclient().setEvaluation(filter); 152 if (nLimit > 0) { 153 fhclient().setLimit(nLimit); 154 } 155 oids = fhclient().latestsT("i:objectId", 156 null, 157 timeLimit, 158 true); 159 fhclient().setEvaluation(null); 160 } 161 else { 162 Calendar cal; 163 Date d; 164 String sd; 165 JSONArray ja; 166 JSONObject jo; 167 for (String cls : clss) { 168 cal = Calendar.getInstance(); 169 cal.add(Calendar.MINUTE, -nLimit); 170 d = cal.getTime(); 171 sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d); 172 if (cls.equals("*")) { 173 ja = FPC.anomaly(new JSONObject().put("n", nLimit). 174 put("startdate", sd). 175 put("columns", "i:objectId"). 176 put("output-format", "json")); 177 } 178 else { 179 ja = FPC.latests(new JSONObject().put("n", nLimit). 180 put("class", cls). 181 put("startdate", sd). 182 put("columns", "i:objectId"). 183 put("output-format", "json")); 184 } 185 for (int i = 0; i < ja.length(); i++) { 186 jo = ja.getJSONObject(i); 187 oids.add(jo.getString("i:objectId")); 188 } 189 log.info("*** " + cls + "[" + ja.length() + "]:"); 190 } 191 } 192 classifySources(classifiers, oids, hbaseUrl); 193 } 194 195 /** Classify <em>source</em> . 196 * @param classifiers The {@link Classifier}s to be used. 197 * @param oids The {@link Set} of <tt>objectId</tt>s of source to be added. 198 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 199 * @throws LomikelException If anything fails. */ 200 public void classifySources(Classifier[] classifiers, 201 Set<String> oids, 202 String hbaseUrl) throws LomikelException { 203 int size = oids.size(); 204 int n = 0; 205 long dt; 206 double freq; 207 long startTime = System.currentTimeMillis(); 208 // loop over all sources 209 for (String oid : oids) { 210 log.info(oid + " (" + n + " of " + size + "):"); 211 for (Classifier classifier : classifiers) { 212 try { 213 classifySource(classifier, oid, hbaseUrl); 214 } 215 catch (LomikelException e) { 216 log.error("Cannot get classification for " + oid, e); 217 } 218 } 219 n++; 220 dt = (System.currentTimeMillis() - startTime) / 1000; 221 freq = (double)n / (double)dt; 222 log.info("\t\twith " + String.format("%.2f", freq) + " Hz"); 223 } 224 } 225 226 /** Classify <em>source</em>. 227 * @param classifier The {@link Classifier} to be used. 228 * @param objectId The <tt>objectId</tt> of source to be added. 229 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 230 * @throws LomikelException If anything fails. */ 231 public void classifySource(Classifier classifier, 232 String objectId, 233 String hbaseUrl) throws LomikelException { 234 fhclient(hbaseUrl); 235 classifySource(classifier, objectId); 236 } 237 238 /** Classify <em>source</em>. 239 * @param classifier The {@link Classifier} to be used. 240 * @param objectId The <tt>objectId</tt> of source to be added. 241 * @throws LomikelException If anything fails. */ 242 public void classifySource(Classifier classifier, 243 String objectId) throws LomikelException { 244 if (g().V().has("lbl", "source").has("objectId", objectId).hasNext()) { 245 Vertex v1 = g().V().has("lbl", "source").has("objectId", objectId).next(); 246 List<Vertex> v2s = g().V(v1).in(). 247 has("lbl", "SoI"). 248 has("classifier", classifier.name()). 249 has("flavor", classifier.flavor()). 250 toList(); 251 Iterator<Edge> edges; 252 for (Vertex v2 : v2s) { 253 edges = g().V(v1).inE(). 254 has("lbl", "deepcontains"). 255 where(otherV(). 256 is(v2)). 257 toStream(). 258 iterator(); 259 while (edges.hasNext()) { 260 edges.next().remove(); 261 } 262 } 263 // will be commited in registration 264 } 265 classifier.classify(this, objectId); 266 } 267 268 /** Register <em>source</em> in <em>SoI</em>. 269 * @param classifier The {@link Classifier} to be used. 270 * @param cls The type (class) of <em>SoI</em> {@link Vertex}. 271 * It will be created if not yet exists. 272 * @param objectId The objectId of the new <em>Source</em> {@link Vertex}. 273 * It will be created if not yet exists. 274 * @param weight The weight of the connection. 275 * Usualy the number of <em>Alerts</em> of this type. 276 * @param instanceS The <em>jd</em> of related <em>Alerts</em> as strings separated by comma. 277 * Potential square brackets are removed. 278 * May be <tt>null</tt> or empty. */ 279 public void registerSoI(Classifier classifier, 280 String cls, 281 String objectId, 282 double weight, 283 String instancesS, 284 String weightsS) { 285 List<String> instances = new ArrayList<>(); 286 List<Double> weights = new ArrayList<>(); 287 if (instancesS != null && !instancesS.trim().equals("")) { 288 for (String instance : instancesS.replaceAll("\\[", "").replaceAll("]", "").split(",")) { 289 instances.add(instance); 290 } 291 } 292 if (weightsS != null && !weightsS.trim().equals("")) { 293 for (String weighs : weightsS.replaceAll("\\[", "").replaceAll("]", "").split(",")) { 294 weights.add(Double.valueOf(weight)); 295 } 296 } 297 registerSoI(classifier, cls, objectId, weight, instances, weights); 298 } 299 300 /** Register <em>source</em> in <em>SoI</em>. 301 * @param classifier The {@link Classifier} to be used. 302 * @param cls The type (class) of <em>SoI</em> {@link Vertex}. 303 * It will be created if not yet exists. 304 * @param objectId The objectId of the new <em>Source</em> {@link Vertex}. 305 * It will be created if not yet exists. 306 * @param weight The total weight of the connection. 307 * Usualy the number of <em>Alerts</em> of this type. 308 * @param instances The <em>jd</em> of related <em>Alerts</em>. 309 * @param weights The weights of related <em>Alerts</em>. */ 310 public void registerSoI(Classifier classifier, 311 String cls, 312 String objectId, 313 double weight, 314 List<String> instances, 315 List<Double> weights) { 316 log.info("\tregistering " + objectId + " as " + classifier + " / " + cls + " with weight " + weight); 317 Vertex soi = g().V().has("lbl", "SoI" ). 318 has("classifier", classifier.name() ). 319 has("flavor", classifier.flavor()). 320 has("cls", cls ). 321 fold(). 322 coalesce(unfold(), 323 addV("SoI"). 324 property("lbl", "SoI"). 325 property("classifier", classifier.name() ). 326 property("flavor", classifier.flavor()). 327 property("cls", cls )). 328 next(); 329 Vertex s = g().V().has("lbl", "source"). 330 has("objectId", objectId). 331 fold(). 332 coalesce(unfold(), 333 addV("source"). 334 property("lbl", "source"). 335 property("objectId", objectId)). 336 property("importDate", _now). 337 next(); 338 addEdge(g().V(soi).next(), 339 g().V(s).next(), 340 "deepcontains", 341 new String[]{"weight", 342 "instances", 343 "weights"}, 344 new String[]{"" + weight, 345 instances.toString().replaceFirst("\\[", "").replaceAll("]", ""), 346 weights.toString().replaceFirst("\\[", "").replaceAll("]", "")}, 347 true); 348 commit(); 349 } 350 351 /** Clean tree under <em>SoI</em>. 352 * Drop alerts. Alerts are dropped even if they have other 353 * {@link Edge}s. 354 * @param classifier The {@link Classifier} to be used. 355 * @param cls The type (class) of <em>SoI</em>. 356 * @throws LomikelException If anything goes wrong. */ 357 public void cleanSoI(Classifier classifier, 358 String cls) throws LomikelException { 359 log.info("Cleaning " + cls + " SoI"); 360 g().V().has("lbl", "SoI"). 361 has("classifier", classifier.name() ). 362 has("flavor", classifier.flavor()). 363 has("cls", cls ). 364 out(). 365 out(). 366 drop(). 367 iterate(); 368 commit(); 369 } 370 371 /** Generate <em>overlaps</em> Edges between <em>SoI</em>. 372 * Possibly between two {@link Classifier}s. 373 * @param classifier The {@link Classifier}s to be used. */ 374 public void generateCorrelations(Classifier... classifiers) { 375 log.info("Generating correlations for SoI for " + classifiers); 376 List<String> namesL = new ArrayList<>(); 377 List<String> flavorsL = new ArrayList<>(); 378 for (Classifier classifier : classifiers) { 379 namesL.add( classifier.name() ); 380 flavorsL.add(classifier.flavor()); 381 // Clean all correlations 382 g().V().has("lbl", "SoI" ). 383 has("classifier", classifier.name() ). 384 has("flavor", classifier.flavor()). 385 bothE(). 386 has("lbl", "overlaps"). 387 drop(). 388 iterate(); 389 // Remove wrong SoI 390 g().V().has("lbl", "SoI" ). 391 has("classifier", classifier.name() ). 392 has("flavor", classifier.flavor()). 393 not(has("cls")). 394 drop(). 395 iterate(); 396 } 397 String[] names = namesL.toArray( String[]::new); 398 String[] flavors = flavorsL.toArray(String[]::new); 399 commit(); 400 // Accumulate correlations and sizes 401 Map<String, Double> weights0 = new HashMap<>(); // cls -> weight (for one source) 402 Map<Pair<String, String>, Double> corrS = new HashMap<>(); // [cls1, cls2] -> weight (for all sources between SoI-SoI) 403 Map<String, Double> sizeS = new HashMap<>(); // cls -> total (for all sources of SoI) 404 SortedSet<String> types0 = new TreeSet<>(); // [cls] (for one source) 405 SortedSet<String> types = new TreeSet<>(); // [cls] (for all sources) 406 Vertex source; 407 Iterator<Edge> deepcontainsIt; 408 Edge deepcontains; 409 double weight; 410 double weight1; 411 double weight2; 412 double cor; 413 Vertex soi; 414 Vertex soi1; 415 Vertex soi2; 416 String cls; 417 Pair<String, String> rel; 418 // Loop over sources and accumulated weights to each source 419 GraphTraversal<Vertex, Vertex> sourceT = g().V().has("lbl", "source"); 420 while (sourceT.hasNext()) { 421 weights0.clear(); 422 types0.clear(); 423 source = sourceT.next(); 424 deepcontainsIt = source.edges(Direction.IN); 425 // Get all weights to this source 426 while (deepcontainsIt.hasNext()) { 427 deepcontains = deepcontainsIt.next(); 428 weight = Double.parseDouble(deepcontains.property("weight").value().toString()); 429 soi1 = deepcontains.outVertex(); 430 cls = soi1.property("cls").value().toString(); 431 types0.add(cls); 432 types.add(cls); 433 weights0.put(cls, weight); 434 } 435 // Double loop over accumulated weights and fill weights between SoIs 436 for (String cls1 : types0) { 437 weight1 = weights0.get(cls1); 438 for (String cls2 : types0) { 439 weight2 = weights0.get(cls2); 440 rel = Pair.of(cls1, cls2); 441 // SoI-SoI 442 if (!corrS.containsKey(rel)) { 443 corrS.put(rel, 0.0); 444 } 445 cor = corrS.get(rel); 446 corrS.put(rel, cor + 1.0); 447 } 448 } 449 } 450 // Fill total sizes 451 double sizeS0; 452 for (String cls1 : types) { 453 sizeS0 = 0.0; 454 for (String cls2 : types) { 455 if (corrS.containsKey(Pair.of(cls1, cls2))) { 456 sizeS0 += corrS.get(Pair.of(cls1, cls2)); 457 } 458 } 459 sizeS.put(cls1, sizeS0); 460 } 461 // Create overlaps 462 int ns = 0; 463 // Double-loop over SoI and create overlaps Edge SoI-SoI if non empty 464 // NOTE: it takes all SoI names and all SoI flavors (even if they are not requested in all combinations) 465 for (String cls1 : types) { 466 try { 467 soi1 = g().V().has("lbl", "SoI" ). 468 has("classifier", within(names) ). 469 has("flavor", within(flavors)). 470 has("cls", cls1 ). 471 next(); 472 for (String cls2 : types) { 473 if (corrS.containsKey(Pair.of(cls1, cls2))) { 474 try { 475 soi2 = g().V().has("lbl", "SoI" ). 476 has("classifier", within(names) ). 477 has("flavor", within(flavors)). 478 has("cls", cls2 ). 479 next(); 480 addEdge(g().V(soi1).next(), 481 g().V(soi2).next(), 482 "overlaps", 483 new String[]{"intersection", 484 "sizeIn", 485 "sizeOut"}, 486 new Double[]{corrS.get(Pair.of(cls1, cls2)), 487 sizeS.get(cls1), 488 sizeS.get(cls2)}, 489 true); 490 ns++; 491 } 492 catch (NoSuchElementException e) { 493 log.debug("SoI for " + cls2 + " doesn't exist"); 494 } 495 } 496 } 497 } 498 catch (NoSuchElementException e) { 499 log.debug("SoI for " + cls1 + " doesn't exist"); 500 } 501 } 502 commit(); 503 log.info("" + ns + " source-source correlations generated"); 504 } 505 506 /** Create a new {@link FinkHBaseClient}. Singleton when url unchanged. 507 * @param hbaseUrl The HBase url as <tt>ip:port:table[:schema]</tt>. 508 * @return The corresponding {@link FinkHBaseClient}, created and initialised if needed. 509 * @throws LomikelException If cannot be created. */ 510 public FinkHBaseClient fhclient(String hbaseUrl) throws LomikelException { 511 if (hbaseUrl == null || hbaseUrl.equals(_fhclientUrl)) { 512 return _fhclient; 513 } 514 _fhclientUrl = hbaseUrl; 515 String[] url = hbaseUrl.split(":"); 516 String ip = url[0]; 517 String port = url[1]; 518 String table = url[2]; 519 String schema = ""; 520 if (url.length >= 4) { 521 schema = url[3]; 522 } 523 _fhclient = new FinkHBaseClient(ip, port); 524 _fhclient.connect(table, schema); 525 return _fhclient; 526 } 527 528 /** Get existing {@link FinkHBaseClient}. 529 * @return The corresponding {@link FinkHBaseClient}. 530 * @throws LomikelException If not yet created. */ 531 public FinkHBaseClient fhclient() throws LomikelException { 532 if (_fhclient == null) { 533 throw new LomikelException("FinkHBaseClient not initialised"); 534 } 535 return _fhclient; 536 } 537 538 /** Give HBase url. 539 * @return The HBase url as <tt>ip:port:table[:schema]</tt>. */ 540 public String hbaseUrl() { 541 return _fhclientUrl; 542 } 543 544 private FinkHBaseClient _fhclient; 545 546 private String _fhclientUrl; 547 548 private String _now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString(); 549 550 private static String FINK_OBJECTS_WS = "https://api.fink-portal.org/api/v1/objects"; 551 private static String FINK_LATESTS_WS = "https://api.fink-portal.org/api/v1/latests"; 552 private static String FINK_ANOMALY_WS = "https://api.fink-portal.org/api/v1/anomaly"; 553 554 /** Logging . */ 555 private static Logger log = LogManager.getLogger(FinkGremlinRecipies.class); 556 557 }