001package com.astrolabsoftware.FinkBrowser.Januser; 002 003import com.Lomikel.Utils.SmallHttpClient; 004import com.Lomikel.Utils.NotifierURL; 005import com.Lomikel.Utils.MapUtil; 006import com.Lomikel.Utils.Pair; 007import com.Lomikel.Utils.LomikelException; 008import com.Lomikel.HBaser.HBaseClient; 009import com.Lomikel.Januser.GremlinRecipies; 010import com.Lomikel.Januser.ModifyingGremlinClient; 011import com.astrolabsoftware.FinkBrowser.HBaser.FinkHBaseClient; 012import com.astrolabsoftware.FinkBrowser.FinkPortalClient.FPC; 013 014// Tinker Pop 015import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; 016import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; 017import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; 018import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.otherV; 019import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.V; 020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.fold; 021import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.has; 022import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.not; 023import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 024import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; 025import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.repeat; 026import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.values; 027import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count; 028import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; 029import static org.apache.tinkerpop.gremlin.process.traversal.P.within; 030import org.apache.tinkerpop.gremlin.structure.Edge; 031import org.apache.tinkerpop.gremlin.structure.Vertex; 032import org.apache.tinkerpop.gremlin.structure.Property; 033import org.apache.tinkerpop.gremlin.structure.VertexProperty; 034import org.apache.tinkerpop.gremlin.structure.Direction; 035import org.apache.tinkerpop.gremlin.structure.Graph; 036 037// Janus Graph 038import org.janusgraph.core.SchemaViolationException; 039import org.janusgraph.graphdb.vertices.StandardVertex; 040import org.janusgraph.graphdb.database.StandardJanusGraph; 041 042// HBase 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Get; 045 046// org.json 047import org.json.JSONArray; 048import org.json.JSONObject; 049 050// Java Mail 051import javax.mail.MessagingException; 052 053// Java 054import java.util.Arrays; 055import java.util.stream.Collectors; 056import java.util.Iterator; 057import java.util.ArrayList; 058import java.util.List; 059import java.util.Map; 060import java.util.HashMap; 061import java.util.LinkedHashMap; 062import java.util.TreeMap; 063import java.util.SortedSet; 064import java.util.TreeSet; 065import java.util.Set; 066import java.util.HashSet; 067import java.util.Iterator; 068import java.util.Optional; 069import java.util.Calendar; 070import java.util.Date; 071import java.text.SimpleDateFormat; 072import java.util.NoSuchElementException; 073 074// Log4J 075import org.apache.logging.log4j.Logger; 076import org.apache.logging.log4j.LogManager; 077 078/** <code>FinkGremlinRecipies</code> provides various recipies to handle 079 * and modify Gremlin Graphs for Fink. 080 * @opt attributes 081 * @opt operations 082 * @opt types 083 * @opt visibility 084 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 085// TBD: check precodition for methods, wgich doesn't work with 'client' creation 086public class FinkGremlinRecipies extends GremlinRecipies { 087 088 /** Create and attach to {@link GraphTraversalSource}. 089 * @param g The attached {@link GraphTraversalSource}. */ 090 public FinkGremlinRecipies(GraphTraversalSource g) { 091 super(g); 092 } 093 094 /** Create and attach to {@link ModifyingGremlinClient}. 095 * @param client The attached {@link ModifyingGremlinClient}. */ 096 public FinkGremlinRecipies(ModifyingGremlinClient client) { 097 super(client); 098 } 099 100 /** Execute full chain of new sources correlations analyses. 101 * @param classifierNames The names of the {@link Classifier} to be used. 102 * @param filter The HBase evaluation formula to be applied. 103 * Ignored if <tt>clss</tt> are specified. 104 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 105 * @param nLimit The maximal number of alerts getting from HBase or Fink Portal. 106 * <tt>0</tt> means no limit. 107 * @param timeLimit How far into the past the search should search (in minutes). 108 * @param clss An array of <em>classes</em> taken from {@link FPC}, 109 * if contains <tt>Anomaly</tt>, get anomalies from {@link FPC}, 110 * if <tt>null</tt>, analyse <em>sources</em> from HBase database. 111 * @param enhance Whether expand tree under all <em>SourcesOfInterest</em> with alerts 112 * possibly filled with requested HBase columns. 113 * @param columns HBase columns to be copied into graph alerts. May be <tt>null</tt>. 114 * @throws LomikelException If anything fails. */ 115 public void processSourcesOfInterest(String[] classifierNames, 116 String filter, 117 String hbaseUrl, 118 int nLimit, 119 int timeLimit, 120 String[] clss, 121 boolean enhance, 122 String columns) throws LomikelException { 123 Classifiers[] classifiers = new Classifiers[classifierNames.length]; 124 for (int i = 0; i < classifierNames.length; i++) { 125 classifiers[i] = Classifiers.valueOf(classifierNames[i]); 126 } 127 fillSourcesOfInterest(classifiers, filter, hbaseUrl, nLimit, timeLimit, clss, enhance, columns); 128 generateCorrelations(classifiers); 129 } 130 131 /** Fill graph with <em>SourcesOfInterest</em> and expand them to alerts (if requested). 132 * @param classifiers The {@link Classifiers}s to be used. 133 * @param filter The HBase evaluation formula to be applied. 134 * Ignored if <tt>clss</tt> are specified. 135 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 136 * @param nLimit The maximal number of alerts getting from HBase or Fink Portal. 137 * <tt>0</tt> means no limit. 138 * @param timeLimit How far into the past the search should search (in minutes). 139 * @param clss An array of <em>classes</em> taken from {@link FPC}, 140 * if contains <tt>Anomaly</tt>, get anomalies from {@link FPC}, 141 * if <tt>null</tt>, analyse <em>sources</em> from HBase database. 142 * @param enhance Whether expand tree under all <em>SourcesOfInterest</em> with alerts 143 * possibly filled with requested HBase columns. 144 * @param columns The HBase columns to be copied into graph alerts. May be <tt>null</tt>. 145 * @throws LomikelException If anything fails. */ 146 public void fillSourcesOfInterest(Classifiers[] classifiers, 147 String filter, 148 String hbaseUrl, 149 int nLimit, 150 int timeLimit, 151 String[] clss, 152 boolean enhance, 153 String columns) throws LomikelException { 154 String clssDesc = ""; 155 if (clss != null) { 156 clssDesc = "of " + Arrays.toString(clss); 157 } 158 log.info("Filling SourcesOfInterest " + clssDesc + " using " + Arrays.toString(classifiers) + " classifiers, nLimit = " + nLimit + ", timeLimit = " + timeLimit); 159 log.info("Importing from " + hbaseUrl + ":"); 160 fhclient(hbaseUrl); 161 if (enhance) { 162 log.info("\tenhancing with " + columns); 163 } 164 Set<String> oids = new HashSet<>();; 165 if (clss == null) { 166 fhclient().setEvaluation(filter); 167 if (nLimit > 0) { 168 fhclient().setLimit(nLimit); 169 } 170 oids = fhclient().latestsT("i:objectId", 171 null, 172 timeLimit, 173 true); 174 fhclient().setEvaluation(null); 175 } 176 else { 177 Calendar cal; 178 Date d; 179 String sd; 180 JSONArray ja; 181 JSONObject jo; 182 for (String cls : clss) { 183 cal = Calendar.getInstance(); 184 cal.add(Calendar.MINUTE, -nLimit); 185 d = cal.getTime(); 186 sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d); 187 if (cls.equals("*")) { 188 ja = FPC.anomaly(new JSONObject().put("n", nLimit). 189 put("startdate", sd). 190 put("columns", "i:objectId"). 191 put("output-format", "json")); 192 } 193 else { 194 ja = FPC.latests(new JSONObject().put("n", nLimit). 195 put("class", cls). 196 put("startdate", sd). 197 put("columns", "i:objectId"). 198 put("output-format", "json")); 199 } 200 for (int i = 0; i < ja.length(); i++) { 201 jo = ja.getJSONObject(i); 202 oids.add(jo.getString("i:objectId")); 203 } 204 log.info("*** " + cls + "[" + ja.length() + "]:"); 205 } 206 } 207 classifySources(classifiers, oids, hbaseUrl, enhance, columns); 208 } 209 210 /** Classify <em>source</em> and expand them to alerts (if requested). 211 * @param classifiers The {@link Classifiers}s to be used. 212 * @param oids The {@link Set} of <tt>objectId</tt>s of source to be added. 213 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 214 * @param enhance Whether expand tree under all <em>SourcesOfInterest</em> with alerts 215 * possibly filled with requested HBase columns. 216 * @param columns The HBase columns to be copied into graph alerts. May be <tt>null</tt>. 217 * @throws LomikelException If anything fails. */ 218 public void classifySources(Classifiers[] classifiers, 219 Set<String> oids, 220 String hbaseUrl, 221 boolean enhance, 222 String columns) throws LomikelException { 223 int size = oids.size(); 224 int n = 0; 225 long dt; 226 double freq; 227 long startTime = System.currentTimeMillis(); 228 // loop over all sources 229 for (String oid : oids) { 230 log.info(oid + " (" + n + " of " + size + "):"); 231 for (Classifiers classifier : classifiers) { 232 try { 233 classifySource(classifier, oid, hbaseUrl, enhance, columns); 234 } 235 catch (LomikelException e) { 236 log.error("Cannot get classification for " + oid, e); 237 } 238 } 239 n++; 240 dt = (System.currentTimeMillis() - startTime) / 1000; 241 freq = (double)n / (double)dt; 242 log.info("\t\twith " + String.format("%.2f", freq) + " Hz"); 243 } 244 } 245 246 /** Classify <em>source</em> and expand them to alerts (if requested). 247 * @param classifier The {@link Classifier} to be used. 248 * @param objectId The <tt>objectId</tt> of source to be added. 249 * @param hbaseUrl The url of HBase with alerts as <tt>ip:port:table:schema</tt>. 250 * @param enhance Whether expand tree under all <em>SourcesOfInterest</em> with alerts 251 * possibly filled with requested HBase columns. 252 * @param columns The HBase columns to be copied into graph alerts. May be <tt>null</tt>. 253 * @throws LomikelException If anything fails. */ 254 public void classifySource(Classifiers classifier, 255 String objectId, 256 String hbaseUrl, 257 boolean enhance, 258 String columns) throws LomikelException { 259 if (g().V().has("lbl", "source").has("objectId", objectId).hasNext()) { 260 Vertex v1 = g().V().has("lbl", "source").has("objectId", objectId).next(); 261 List<Vertex> v2s = g().V(v1).in(). 262 has("lbl", "SourcesOfInterest"). 263 has("classifier", classifier.name()). 264 toList(); 265 Iterator<Edge> edges; 266 for (Vertex v2 : v2s) { 267 edges = g().V(v1).inE(). 268 has("lbl", "deepcontains"). 269 where(otherV(). 270 is(v2)). 271 toStream(). 272 iterator(); 273 while (edges.hasNext()) { 274 edges.next().remove(); 275 } 276 } 277 // will be commited in registration 278 } 279 classifier.instance().classify(this, objectId, enhance, columns); 280 } 281 282 /** Register <em>source</em> in <em>SourcesOfInterest</em>. 283 * @param classifier The {@link Classifier} to be used. 284 * @param cls The type (class) of <em>SourcesOfInterest</em> {@link Vertex}. 285 * It will be created if not yet exists. 286 * @param objectId The objectId of the new <em>Source</em> {@link Vertex}. 287 * It will be created if not yet exists. 288 * @param weight The weight of the connection. 289 * Usualy the number of <em>Alerts</em> of this type. 290 * @param instanceS The <em>jd</em> of related <em>Alerts</em> as strings separated by comma. 291 * Potential square brackets are removed. 292 * May be <tt>null</tt> or empty. 293 * @param enhance Whether expand tree under all <em>SourcesOfInterest</em> with alerts 294 * possibly filled with requested HBase columns. 295 * @param columns The HBase columns to be filled into alerts. May be <tt>null</tt>. 296 * Ignored if enhancement not requested. */ 297 public void registerSourcesOfInterest(Classifiers classifier, 298 String cls, 299 String objectId, 300 double weight, 301 String instancesS, 302 boolean enhance, 303 String columns) { 304 Set<Double> instances = new HashSet<>(); 305 if (instancesS != null && !instancesS.trim().equals("")) { 306 for (String instance : instancesS.replaceAll("\\[", "").replaceAll("]", "").split(",")) { 307 instances.add(Double.parseDouble(instance)); 308 } 309 } 310 registerSourcesOfInterest(classifier, cls, objectId, weight, instances, enhance, columns); 311 } 312 313 /** Register <em>source</em> in <em>SourcesOfInterest</em>. 314 * @param classifier The {@link Classifier} to be used. 315 * @param cls The type (class) of <em>SourcesOfInterest</em> {@link Vertex}. 316 * It will be created if not yet exists. 317 * @param objectId The objectId of the new <em>Source</em> {@link Vertex}. 318 * It will be created if not yet exists. 319 * @param weight The weight of the connection. 320 * Usualy the number of <em>Alerts</em> of this type. 321 * @param instances The <em>jd</em> of related <em>Alerts</em>. 322 * @param enhance Whether expand tree under all <em>SourcesOfInterest</em> with alerts 323 * filled with requested HBase columns. 324 * @param columns The HBase columns to be filled into alerts. May be <tt>null</tt>. 325 * Ignored if enhancement not requested. */ 326 public void registerSourcesOfInterest(Classifiers classifier, 327 String cls, 328 String objectId, 329 double weight, 330 Set<Double> instances, 331 boolean enhance, 332 String columns) { 333 log.info("\tregistering " + objectId + " as " + cls + " with weight " + weight); 334 Vertex soi = g().V().has("lbl", "SourcesOfInterest"). 335 has("classifier", classifier.name()). 336 has("cls", cls). 337 fold(). 338 coalesce(unfold(), 339 addV("SourcesOfInterest"). 340 property("lbl", "SourcesOfInterest"). 341 property("classifier", classifier.name() ). 342 property("cls", cls ). 343 property("technology", "HBase" ). 344 property("url", hbaseUrl() )). 345 next(); 346 Vertex s = g().V().has("lbl", "source"). 347 has("objectId", objectId). 348 fold(). 349 coalesce(unfold(), 350 addV("source"). 351 property("lbl", "source"). 352 property("objectId", objectId)). 353 property("importDate", _now). 354 next(); 355 addEdge(g().V(soi).next(), 356 g().V(s).next(), 357 "deepcontains", 358 new String[]{"weight", "instances" }, 359 new String[]{"" + weight, instances.toString().replaceFirst("\\[", "").replaceAll("]", "")}, 360 true); 361 if (enhance) { 362 try { 363 enhanceSource(classifier, s, instances.toString().replaceFirst("\\[", "").replaceAll("]", "").split(","), columns); 364 } 365 catch (LomikelException e) { 366 log.error("Cannot enhance source", e); 367 } 368 } 369 commit(); // TBD: not needed if enhancing 370 } 371 372 /** Expand tree under all <em>SourcesOfInterest</em> with alerts 373 * filled with requested HBase columns. 374 * @param classifier The {@link Classifier} to be used. 375 * @param columns The HBase columns to be filled into alerts. May be <tt>null</tt>. 376 * @throws LomikelException If anything goes wrong. */ 377 public void enhanceSourcesOfInterest(Classifiers classifier, 378 String columns) throws LomikelException { 379 log.info("Expanding all SourcesOfInterest and enhancing them with " + columns); 380 for (Object soi : g().V().has("lbl", "SourcesOfInterest"). 381 has("classifier", classifier.name() ). 382 values("cls" ). 383 toSet()) { 384 enhanceSourcesOfInterest(classifier, soi.toString().trim(), columns); 385 } 386 } 387 388 /** Expand tree under <em>SourcesOfInterest</em> with alerts 389 * filled with requested HBase columns. 390 * @param classifier The {@link Classifier} to be used. 391 * @param cls The type (class) of <em>SourcesOfInterest</em>. 392 * @param columns The HBase columns to be filled into alerts. May be <tt>null</tt>. 393 * @throws LomikelException If anything goes wrong. */ 394 public void enhanceSourcesOfInterest(Classifiers classifier, 395 String cls, 396 String columns) throws LomikelException { 397 log.info("Expanding " + cls + " SourcesOfInterest and enhancing them with " + columns); 398 Vertex soi = g().V().has("lbl", "SourcesOfInterest"). 399 has("classifier", classifier.name() ). 400 has("cls", cls ). 401 next(); 402 fhclient(soi.property("url").value().toString()); 403 Iterator<Edge> containsIt = soi.edges(Direction.OUT); 404 Edge contains; 405 Vertex source; 406 String objectId; 407 String[] jds; 408 while (containsIt.hasNext()) { 409 contains = containsIt.next(); 410 source = contains.inVertex(); 411 objectId = source.property("objectId").value().toString(); 412 jds = contains.property("instances").value().toString().replaceFirst("\\[", "").replaceAll("]", "").split(","); 413 enhanceSource(classifier, source, jds, columns); 414 } 415 } 416 417 /** Expand tree under <em>source</em> with alerts 418 * filled with requested HBase columns. It also assembles 419 * related AlertsOfInterest. 420 * @param classifier The {@link Classifier} to be used. 421 * @param source The source. 422 * @param jds The <em>jd</em> of related <em>Alerts</em>. 423 * @param columns The HBase columns to be filled into alerts. May be <tt>null</tt>. 424 * @throws LomikelException If anything goes wrong. */ 425 public void enhanceSource(Classifiers classifier, 426 Vertex source, 427 String[] jds, 428 String columns) throws LomikelException { 429 String objectId = source.property("objectId").value().toString(); 430 int n = 0; 431 String key; 432 Vertex alert; 433 List<Map<String, String>> results; 434 for (String jd : jds) { 435 n++; 436 key = objectId + "_" + jd.trim(); 437 alert = g().V().has("lbl", "alert"). 438 has("objectId", objectId). 439 has("jd", jd). 440 fold(). 441 coalesce(unfold(), 442 addV("alert"). 443 property("lbl", "alert" ). 444 property("objectId", objectId). 445 property("jd", jd )). 446 property("importDate", _now). 447 next(); 448 if (columns != null) { 449 results = fhclient().results2List(fhclient().scan(key, 450 null, 451 columns, 452 0, 453 false, 454 false)); 455 for (Map<String, String> result : results) { 456 for (Map.Entry<String, String> entry : result.entrySet()) { 457 if (!entry.getKey().split(":")[0].equals("key")) { 458 try { 459 alert.property(entry.getKey().split(":")[1], entry.getValue()); 460 } 461 catch (SchemaViolationException e) { 462 log.error("Cannot enhance " + objectId + "_" + jd + ": " + entry.getKey() + " => " + entry.getValue() + "\n\t" + e.getMessage()); 463 } 464 } 465 } 466 } 467 } 468 addEdge(source, alert, "sends"); 469 assembleAlertsOfInterest(classifier, alert); 470 } 471 commit(); 472 log.info("\t\t" + n + " alerts added"); 473 } 474 475 /** Clean tree under <em>SourcesOfInterest</em>. 476 * Drop alerts. Alerts are dropped even if they have other 477 * {@link Edge}s. 478 * @param classifier The {@link Classifier} to be used. 479 * @param cls The type (class) of <em>SourcesOfInterest</em>. 480 * @throws LomikelException If anything goes wrong. */ 481 public void cleanSourcesOfInterest(Classifiers classifier, 482 String cls) throws LomikelException { 483 log.info("Cleaning " + cls + " SourcesOfInterest"); 484 g().V().has("lbl", "SourcesOfInterest"). 485 has("classifier", classifier.name() ). 486 has("cls", cls ). 487 out(). 488 out(). 489 drop(). 490 iterate(); 491 commit(); 492 } 493 494 /** Assemble AlertsOfInterest from all existing alerts. 495 * @param classifier The {@link Classifier} to be used. */ 496 public void assembleAlertsOfInterest(Classifiers classifier) { 497 log.info("Assembling AlertsOfInterest"); 498 GraphTraversal<Vertex, Vertex> alertT = g().V().has("lbl", "alert"); 499 Vertex alert; 500 while (alertT.hasNext()) { 501 alert = alertT.next(); 502 assembleAlertsOfInterest(classifier, alert); 503 } 504 } 505 506 /** Assemble AlertsOfInterest from an alert. 507 * @param classifier The {@link Classifier} to be used. 508 * @param alert The existing alert. */ 509 public void assembleAlertsOfInterest(Classifiers classifier, 510 Vertex alert) { 511 String jd = alert.property("jd").value().toString(); 512 Vertex source = alert.edges(Direction.IN).next().outVertex(); // BUG: should check lbl == source 513 String objectId = source.property("objectId").value().toString(); 514 Iterator<Edge> containsIt = source.edges(Direction.IN); // BUG: should check lbl == SoI 515 String cls = null; 516 Edge contains; 517 String instances = ""; 518 String hbaseUrl = ""; 519 String key; 520 Vertex aoi; 521 // loop over all Edges to SoI 522 while (containsIt.hasNext()) { 523 contains = containsIt.next(); 524 instances = contains.property("instances").value().toString(); 525 // BUG: jd should not be compared as strings 526 // if alert jd presend in this Soi Edge => create AoI and connect it to alert 527 if (instances.contains(jd)) { // just one SourceOfInterest contains each alert 528 cls = contains.outVertex().property("cls").value().toString(); 529 hbaseUrl = contains.outVertex().property("url").value().toString(); 530 key = objectId + "_" + jd; 531 aoi = g().V().has("lbl", "AlertsOfInterest"). 532 has("classifier", classifier.name() ). 533 has("cls", cls ). 534 fold(). 535 coalesce(unfold(), 536 addV("AlertsOfInterest"). 537 property("lbl", "AlertsOfInterest"). 538 property("classifier", classifier.name() ). 539 property("cls", cls ). 540 property("technology", "HBase" ). 541 property("url", hbaseUrl )). 542 next(); 543 addEdge(g().V(aoi).next(), 544 g().V(alert).next(), 545 "contains", 546 new String[]{}, 547 new String[]{}, 548 true); 549 } 550 } 551 commit(); 552 } 553 554 /** Generate <em>overlaps</em> Edges between <em>AlertsOfInterest</em> and <em>SourcesOfInterest</em>. 555 * Possibly between two {@link Classifier}s. 556 * @param classifiers The {@link Classifier}s to be used. */ 557 public void generateCorrelations(Classifiers... classifiers) { 558 String[] classifierNames = Arrays.stream(classifiers).map(c -> c.name()).toArray(String[]::new); 559 log.info("Generating correlations for Sources and Alerts of Interest for " + Arrays.toString(classifierNames)); 560 for (String classifierName : classifierNames) { 561 // Clean all correlations 562 g().V().has("lbl", "AlertsOfInterest" ).has("classifier", classifierName).bothE().has("lbl", "overlaps").drop().iterate(); 563 g().V().has("lbl", "SourcesOfInterest").has("classifier", classifierName).bothE().has("lbl", "overlaps").drop().iterate(); 564 // Remove wrong SoI, AoI 565 g().V().has("lbl", "AlertsOfInterest" ).has("classifier", classifierName).not(has("cls")).drop().iterate(); 566 g().V().has("lbl", "SourcesOfInterest").has("classifier", classifierName).not(has("cls")).drop().iterate(); 567 } 568 commit(); 569 // Accumulate correlations and sizes 570 Map<String, Double> weights0 = new HashMap<>(); // cls -> weight (for one source) 571 Map<Pair<String, String>, Double> corrS = new HashMap<>(); // [cls1, cls2] -> weight (for all sources between SoI-SoI) 572 Map<Pair<String, String>, Double> corrA = new HashMap<>(); // [cls1, cls2] -> weight (for all sources between AoI-AoI) 573 Map<String, Double> sizeS = new HashMap<>(); // cls -> total (for all sources of SoI) 574 Map<String, Double> sizeA = new HashMap<>(); // cls -> total (for all sources of AoI) 575 SortedSet<String> types0 = new TreeSet<>(); // [cls] (for one source) 576 SortedSet<String> types = new TreeSet<>(); // [cls] (for all sources) 577 Vertex source; 578 Iterator<Edge> deepcontainsIt; 579 Edge deepcontains; 580 double weight; 581 double weight1; 582 double weight2; 583 double cor; 584 Vertex soi; 585 Vertex soi1; 586 Vertex soi2; 587 Vertex aoi1; 588 Vertex aoi2; 589 String cls; 590 Pair<String, String> rel; 591 // Loop over sources and accumulated weights to each source 592 GraphTraversal<Vertex, Vertex> sourceT = g().V().has("lbl", "source"); 593 while (sourceT.hasNext()) { 594 weights0.clear(); 595 types0.clear(); 596 source = sourceT.next(); 597 deepcontainsIt = source.edges(Direction.IN); 598 // Get all weights to this source 599 while (deepcontainsIt.hasNext()) { 600 deepcontains = deepcontainsIt.next(); 601 weight = Double.parseDouble(deepcontains.property("weight").value().toString()); 602 soi1 = deepcontains.outVertex(); 603 cls = soi1.property("cls").value().toString(); 604 types0.add(cls); 605 types.add(cls); 606 weights0.put(cls, weight); 607 } 608 // Double loop over accumulated weights and fill weights between SoIs/AoIs 609 for (String cls1 : types0) { 610 weight1 = weights0.get(cls1); 611 for (String cls2 : types0) { 612 weight2 = weights0.get(cls2); 613 rel = Pair.of(cls1, cls2); 614 // SoI-SoI 615 if (!corrS.containsKey(rel)) { 616 corrS.put(rel, 0.0); 617 } 618 cor = corrS.get(rel); 619 corrS.put(rel, cor + 1.0); 620 // AoI-AoI 621 if (!corrA.containsKey(rel)) { 622 corrA.put(rel, 0.0); 623 } 624 cor = corrA.get(rel); 625 corrA.put(rel, cor + weight2); 626 } 627 } 628 } 629 // Fill total sizes 630 double sizeS0; 631 double sizeA0; 632 for (String cls1 : types) { 633 sizeS0 = 0.0; 634 sizeA0 = 0.0; 635 for (String cls2 : types) { 636 if (corrS.containsKey(Pair.of(cls1, cls2))) { 637 sizeS0 += corrS.get(Pair.of(cls1, cls2)); 638 } 639 if (corrA.containsKey(Pair.of(cls1, cls2))) { 640 sizeA0 += corrA.get(Pair.of(cls1, cls2)); 641 } 642 } 643 sizeS.put(cls1, sizeS0); 644 sizeA.put(cls1, sizeA0); 645 } 646 // Create overlaps 647 int ns = 0; 648 int na = 0; 649 // Loop over SoI and create AoI 650 String hbaseUrl; 651 String classifierName; 652 GraphTraversal<Vertex, Vertex> soiT = g().V().has("lbl", "SourcesOfInterest"); 653 while (soiT.hasNext()) { 654 soi = soiT.next(); 655 hbaseUrl = soi.property("url").value().toString(); 656 classifierName = soi.property("classifier").value().toString(); 657 cls = soi.property("cls").value().toString(); 658 g().V().has("lbl", "AlertsOfInterest"). 659 has("classifier", classifierName ). 660 has("cls", cls ). 661 fold(). 662 coalesce(unfold(), 663 addV("AlertsOfInterest"). 664 property("lbl", "AlertsOfInterest"). 665 property("classifier", classifierName ). 666 property("cls", cls ). 667 property("technology", "HBase" ). 668 property("url", hbaseUrl )). 669 iterate(); 670 } 671 // Double-loop over SoI and create overlaps Edge SoI-SoI if non empty 672 for (String cls1 : types) { 673 try { 674 soi1 = g().V().has("lbl", "SourcesOfInterest" ). 675 has("classifier", within(classifierNames)). 676 has("cls", cls1 ). 677 next(); 678 for (String cls2 : types) { 679 if (corrS.containsKey(Pair.of(cls1, cls2))) { 680 try { 681 soi2 = g().V().has("lbl", "SourcesOfInterest" ). 682 has("classifier", within(classifierNames)). 683 has("cls", cls2 ). 684 next(); 685 addEdge(g().V(soi1).next(), 686 g().V(soi2).next(), 687 "overlaps", 688 new String[]{"intersection", 689 "sizeIn", 690 "sizeOut"}, 691 new Double[]{corrS.get(Pair.of(cls1, cls2)), 692 sizeS.get(cls1), 693 sizeS.get(cls2)}, 694 true); 695 ns++; 696 } 697 catch (NoSuchElementException e) { 698 log.debug("SoI for " + cls2 + " doesn't exist"); 699 } 700 } 701 } 702 } 703 catch (NoSuchElementException e) { 704 log.debug("SoI for " + cls1 + " doesn't exist"); 705 } 706 } 707 // Double-loop over AoI and create overlaps Edge AoI-AoI if non empty 708 for (String cls1 : types) { 709 try { 710 aoi1 = g().V().has("lbl", "AlertsOfInterest" ). 711 has("classifier", within(classifierNames)). 712 has("cls", cls1 ). 713 next(); 714 for (String cls2 : types) { 715 if (corrA.containsKey(Pair.of(cls1, cls2))) { 716 try { 717 aoi2 = g().V().has("lbl", "AlertsOfInterest" ). 718 has("classifier", within(classifierNames)). 719 has("cls", cls2 ). 720 next(); 721 addEdge(g().V(aoi1).next(), 722 g().V(aoi2).next(), 723 "overlaps", 724 new String[]{"intersection", 725 "sizeIn", 726 "sizeOut"}, 727 new Double[]{corrA.get(Pair.of(cls1, cls2)), 728 sizeA.get(cls1), 729 sizeA.get(cls2)}, 730 true); 731 na++; 732 } 733 catch (NoSuchElementException e) { 734 log.debug("AoI for " + cls2 + " doesn't exist"); 735 } 736 } 737 } 738 } 739 catch (NoSuchElementException e) { 740 log.debug("SoI for " + cls1 + " doesn't exist"); 741 } 742 } 743 commit(); 744 log.info("" + ns + ", " + na + " source-source and source-alert correlations generated"); 745 } 746 747 /** Create a new {@link FinkHBaseClient}. Singleton when url unchanged. 748 * @param hbaseUrl The HBase url as <tt>ip:port:table[:schema]</tt>. 749 * @return The corresponding {@link FinkHBaseClient}, created and initialised if needed. 750 * @throws LomikelException If cannot be created. */ 751 public FinkHBaseClient fhclient(String hbaseUrl) throws LomikelException { 752 if (hbaseUrl == null || hbaseUrl.equals(_fhclientUrl)) { 753 return _fhclient; 754 } 755 _fhclientUrl = hbaseUrl; 756 String[] url = hbaseUrl.split(":"); 757 String ip = url[0]; 758 String port = url[1]; 759 String table = url[2]; 760 String schema = ""; 761 if (url.length >= 4) { 762 schema = url[3]; 763 } 764 _fhclient = new FinkHBaseClient(ip, port); 765 _fhclient.connect(table, schema); 766 return _fhclient; 767 } 768 769 /** Get existing {@link FinkHBaseClient}. 770 * @return The corresponding {@link FinkHBaseClient}. 771 * @throws LomikelException If not yet created. */ 772 public FinkHBaseClient fhclient() throws LomikelException { 773 if (_fhclient == null) { 774 throw new LomikelException("FinkHBaseClient not initialised"); 775 } 776 return _fhclient; 777 } 778 779 /** Give HBase url. 780 * @return The HBase url as <tt>ip:port:table[:schema]</tt>. */ 781 public String hbaseUrl() { 782 return _fhclientUrl; 783 } 784 785 private FinkHBaseClient _fhclient; 786 787 private String _fhclientUrl; 788 789 private String _now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString(); 790 791 private static String FINK_OBJECTS_WS = "https://api.fink-portal.org/api/v1/objects"; 792 private static String FINK_LATESTS_WS = "https://api.fink-portal.org/api/v1/latests"; 793 private static String FINK_ANOMALY_WS = "https://api.fink-portal.org/api/v1/anomaly"; 794 795 /** Logging . */ 796 private static Logger log = LogManager.getLogger(FinkGremlinRecipies.class); 797 798 }