001package com.astrolabsoftware.FinkBrowser.Januser;
002
003import com.Lomikel.Utils.SmallHttpClient;
004import com.Lomikel.Utils.NotifierURL;
005import com.Lomikel.Utils.MapUtil;
006import com.Lomikel.Utils.Pair;
007import com.Lomikel.Utils.LomikelException;
008import com.Lomikel.HBaser.HBaseClient;
009import com.Lomikel.Januser.GremlinRecipies;
010import com.Lomikel.Januser.ModifyingGremlinClient;
011import com.astrolabsoftware.FinkBrowser.HBaser.FinkHBaseClient;
012import com.astrolabsoftware.FinkBrowser.FinkPortalClient.FPC;
013
014// Tinker Pop
015import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
016import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
017import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
018import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.otherV;
019import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.V;
020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.fold;
021import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.has;
022import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.not;
023import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;
024import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
025import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.repeat;
026import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.values;
027import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count;
028import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
029import static org.apache.tinkerpop.gremlin.process.traversal.P.within;
030import org.apache.tinkerpop.gremlin.structure.Edge;
031import org.apache.tinkerpop.gremlin.structure.Vertex;
032import org.apache.tinkerpop.gremlin.structure.Property;
033import org.apache.tinkerpop.gremlin.structure.VertexProperty;
034import org.apache.tinkerpop.gremlin.structure.Direction;
035import org.apache.tinkerpop.gremlin.structure.Graph;
036
037// Janus Graph
038import org.janusgraph.core.SchemaViolationException;
039import org.janusgraph.graphdb.vertices.StandardVertex;
040import org.janusgraph.graphdb.database.StandardJanusGraph;
041
042// HBase
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Get;
045
046// org.json
047import org.json.JSONArray;
048import org.json.JSONObject;
049
050// Java Mail
051import javax.mail.MessagingException;
052
053// Java
054import java.util.Arrays;
055import java.util.stream.Collectors;
056import java.util.Iterator;
057import java.util.ArrayList;
058import java.util.List;
059import java.util.Map;
060import java.util.HashMap;
061import java.util.LinkedHashMap;
062import java.util.TreeMap;
063import java.util.SortedSet;
064import java.util.TreeSet;
065import java.util.Set;
066import java.util.HashSet;
067import java.util.Iterator;
068import java.util.Optional;
069import java.util.Calendar;
070import java.util.Date;
071import java.text.SimpleDateFormat;
072import java.util.NoSuchElementException;
073
074// Log4J
075import org.apache.logging.log4j.Logger;
076import org.apache.logging.log4j.LogManager;
077
078/** <code>FinkGremlinRecipies</code> provides various recipies to handle
079  * and modify Gremlin Graphs for Fink.
080  * @opt attributes
081  * @opt operations
082  * @opt types
083  * @opt visibility
084  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
085// TBD: check precodition for methods, wgich doesn't work with 'client' creation
086public class FinkGremlinRecipies extends GremlinRecipies {
087    
088  /** Create and attach to {@link GraphTraversalSource}.
089    * @param g The attached {@link GraphTraversalSource}. */
090  public FinkGremlinRecipies(GraphTraversalSource g) {
091    super(g);
092    }
093    
094  /** Create and attach to {@link ModifyingGremlinClient}.
095    * @param client The attached  {@link ModifyingGremlinClient}. */
096  public FinkGremlinRecipies(ModifyingGremlinClient client) {
097    super(client);
098    }
099    
100  /** Execute full chain of new sources correlations analyses.
101    * @param classifierNames The names of the {@link Classifier} to be used.
102    * @param filter          The HBase evaluation formula to be applied.
103    *                        Ignored if <tt>clss</tt> are specified.
104    * @param hbaseUrl        The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
105    * @param nLimit          The maximal number of alerts getting from HBase or Fink Portal.
106    *                        <tt>0</tt> means no limit.
107    * @param timeLimit       How far into the past the search should search (in minutes).
108    * @param clss            An array of <em>classes</em> taken from {@link FPC},
109    *                        if contains <tt>Anomaly</tt>, get anomalies from {@link FPC},                  
110    *                        if <tt>null</tt>, analyse <em>sources</em> from HBase database.
111    * @param enhance         Whether expand tree under all <em>SourcesOfInterest</em> with alerts
112    *                        possibly filled with requested HBase columns.
113    * @param columns         HBase columns to be copied into graph alerts. May be <tt>null</tt>.
114    * @throws LomikelException If anything fails. */
115  public void processSourcesOfInterest(String[] classifierNames,
116                                       String   filter,
117                                       String   hbaseUrl,
118                                       int      nLimit,
119                                       int      timeLimit,
120                                       String[] clss,
121                                       boolean  enhance,
122                                       String   columns) throws LomikelException {
123    Classifiers[] classifiers = new Classifiers[classifierNames.length];
124    for (int i = 0; i < classifierNames.length; i++) {
125      classifiers[i] = Classifiers.valueOf(classifierNames[i]);
126      }
127    fillSourcesOfInterest(classifiers, filter, hbaseUrl, nLimit, timeLimit, clss, enhance, columns);
128    generateCorrelations(classifiers);
129    }
130        
131  /** Fill graph with <em>SourcesOfInterest</em> and expand them to alerts (if requested).
132    * @param classifiers The {@link Classifiers}s to be used.
133    * @param filter          The HBase evaluation formula to be applied.
134    *                        Ignored if <tt>clss</tt> are specified.
135    * @param hbaseUrl    The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
136    * @param nLimit      The maximal number of alerts getting from HBase or Fink Portal.
137    *                    <tt>0</tt> means no limit.
138    * @param timeLimit   How far into the past the search should search (in minutes).
139    * @param clss        An array of <em>classes</em> taken from {@link FPC},
140    *                    if contains <tt>Anomaly</tt>, get anomalies from {@link FPC},                  
141    *                    if <tt>null</tt>, analyse <em>sources</em> from HBase database.
142    * @param enhance     Whether expand tree under all <em>SourcesOfInterest</em> with alerts
143    *                    possibly filled with requested HBase columns.
144    * @param columns     The HBase columns to be copied into graph alerts. May be <tt>null</tt>.
145    * @throws LomikelException If anything fails. */
146  public void fillSourcesOfInterest(Classifiers[] classifiers,
147                                    String        filter,
148                                    String        hbaseUrl,
149                                    int           nLimit,
150                                    int           timeLimit,
151                                    String[]      clss,
152                                    boolean       enhance,
153                                    String        columns) throws LomikelException {
154    String clssDesc = "";
155    if (clss != null) {
156      clssDesc = "of " + Arrays.toString(clss);
157      }
158    log.info("Filling SourcesOfInterest " + clssDesc + " using " + Arrays.toString(classifiers) + " classifiers, nLimit = " + nLimit + ", timeLimit = " + timeLimit);
159    log.info("Importing from " + hbaseUrl + ":");
160    fhclient(hbaseUrl);
161    if (enhance) {
162      log.info("\tenhancing with " + columns);
163      }
164    Set<String> oids = new HashSet<>();;
165    if (clss == null) { 
166      fhclient().setEvaluation(filter);
167      if (nLimit > 0) {
168        fhclient().setLimit(nLimit);
169        }
170      oids = fhclient().latestsT("i:objectId",
171                                 null,
172                                 timeLimit,
173                                 true);
174      fhclient().setEvaluation(null);
175      }
176    else {
177      Calendar cal;
178      Date d;
179      String sd;
180      JSONArray ja;
181      JSONObject jo;
182      for (String cls : clss) {
183        cal = Calendar.getInstance();
184        cal.add(Calendar.MINUTE, -nLimit);
185        d = cal.getTime();
186        sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d);
187        if (cls.equals("*")) {
188          ja = FPC.anomaly(new JSONObject().put("n",             nLimit).
189                                            put("startdate",     sd).
190                                            put("columns",       "i:objectId").
191                                            put("output-format", "json"));
192          }
193        else {
194          ja = FPC.latests(new JSONObject().put("n",             nLimit).
195                                            put("class",         cls).
196                                            put("startdate",     sd).
197                                            put("columns",       "i:objectId").
198                                            put("output-format", "json"));
199          }
200        for (int i = 0; i < ja.length(); i++) {
201          jo = ja.getJSONObject(i);
202          oids.add(jo.getString("i:objectId"));
203          }
204        log.info("*** " + cls + "[" + ja.length() + "]:");
205        }
206      }
207    classifySources(classifiers, oids, hbaseUrl, enhance, columns);
208    }
209    
210  /** Classify <em>source</em> and expand them to alerts (if requested).
211    * @param classifiers The {@link Classifiers}s to be used.
212    * @param oids        The {@link Set} of <tt>objectId</tt>s of source to be added.
213    * @param hbaseUrl    The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
214    * @param enhance     Whether expand tree under all <em>SourcesOfInterest</em> with alerts
215    *                    possibly filled with requested HBase columns.
216    * @param columns     The HBase columns to be copied into graph alerts. May be <tt>null</tt>. 
217    * @throws LomikelException If anything fails. */
218  public void classifySources(Classifiers[] classifiers,
219                              Set<String>   oids,
220                              String        hbaseUrl,
221                              boolean       enhance,
222                              String        columns) throws LomikelException {
223    int size = oids.size();
224    int n = 0;
225    long dt;
226    double freq;
227    long startTime = System.currentTimeMillis();
228    // loop over all sources
229    for (String oid : oids) {
230      log.info(oid + " (" + n + " of " + size + "):");
231      for (Classifiers classifier : classifiers) {
232        try {
233          classifySource(classifier, oid, hbaseUrl, enhance, columns);
234          }
235        catch (LomikelException e) {
236          log.error("Cannot get classification for " + oid, e);
237          }
238        }
239      n++;
240      dt = (System.currentTimeMillis() - startTime) / 1000;
241      freq = (double)n / (double)dt;
242      log.info("\t\twith " + String.format("%.2f", freq) + " Hz");
243      }
244    }
245    
246  /** Classify <em>source</em> and expand them to alerts (if requested).
247    * @param classifier The {@link Classifier} to be used.
248    * @param objectId   The <tt>objectId</tt> of source to be added.
249    * @param hbaseUrl   The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
250    * @param enhance    Whether expand tree under all <em>SourcesOfInterest</em> with alerts
251    *                   possibly filled with requested HBase columns.
252    * @param columns    The HBase columns to be copied into graph alerts. May be <tt>null</tt>. 
253    * @throws LomikelException If anything fails. */
254  public void classifySource(Classifiers classifier,
255                             String      objectId,
256                             String      hbaseUrl,
257                             boolean     enhance,
258                             String      columns) throws LomikelException {  
259    if (g().V().has("lbl", "source").has("objectId", objectId).hasNext()) {
260      Vertex v1 = g().V().has("lbl", "source").has("objectId", objectId).next();
261      List<Vertex> v2s = g().V(v1).in().
262                                   has("lbl", "SourcesOfInterest").
263                                   has("classifier", classifier.name()).
264                                   toList();
265      Iterator<Edge> edges;
266      for (Vertex v2 : v2s) {
267        edges = g().V(v1).inE().
268                          has("lbl", "deepcontains").
269                          where(otherV().
270                          is(v2)).
271                          toStream().
272                          iterator();
273        while (edges.hasNext()) {
274          edges.next().remove(); 
275          }
276        }        
277      // will be commited in registration
278      }
279    classifier.instance().classify(this, objectId, enhance, columns);
280    }
281       
282  /** Register  <em>source</em> in <em>SourcesOfInterest</em>.
283    * @param classifier The {@link Classifier} to be used.
284    * @param cls        The type (class) of <em>SourcesOfInterest</em> {@link Vertex}.
285    *                   It will be created if not yet exists.
286    * @param objectId   The objectId of the new <em>Source</em> {@link Vertex}.
287    *                   It will be created if not yet exists.
288    * @param weight     The weight of the connection.
289    *                   Usualy the number of <em>Alerts</em> of this type. 
290    * @param instanceS  The <em>jd</em> of related <em>Alerts</em> as strings separated by comma.
291    *                   Potential square brackets are removed.
292    *                   May be <tt>null</tt> or empty.
293    * @param enhance    Whether expand tree under all <em>SourcesOfInterest</em> with alerts
294    *                   possibly filled with requested HBase columns.
295    * @param columns    The HBase columns to be filled into alerts. May be <tt>null</tt>.
296    *                   Ignored if enhancement not requested. */
297  public void registerSourcesOfInterest(Classifiers classifier,
298                                        String      cls,
299                                        String      objectId,
300                                        double      weight,
301                                        String      instancesS,
302                                        boolean     enhance,
303                                        String      columns) {   
304    Set<Double> instances = new HashSet<>();
305    if (instancesS != null && !instancesS.trim().equals("")) {
306      for (String instance : instancesS.replaceAll("\\[", "").replaceAll("]", "").split(",")) {
307        instances.add(Double.parseDouble(instance));
308        }
309      }
310    registerSourcesOfInterest(classifier, cls, objectId, weight, instances, enhance, columns);
311    }
312    
313  /** Register <em>source</em> in <em>SourcesOfInterest</em>.
314    * @param classifier The {@link Classifier} to be used.
315    * @param cls        The type (class) of <em>SourcesOfInterest</em> {@link Vertex}.
316    *                   It will be created if not yet exists.
317    * @param objectId   The objectId of the new <em>Source</em> {@link Vertex}.
318    *                   It will be created if not yet exists.
319    * @param weight     The weight of the connection.
320    *                   Usualy the number of <em>Alerts</em> of this type. 
321    * @param instances  The <em>jd</em> of related <em>Alerts</em>.
322    * @param enhance    Whether expand tree under all <em>SourcesOfInterest</em> with alerts
323    *                   filled with requested HBase columns.
324    * @param columns The HBase columns to be filled into alerts. May be <tt>null</tt>.
325    *                   Ignored if enhancement not requested. */
326  public void registerSourcesOfInterest(Classifiers classifier,
327                                        String      cls,
328                                        String      objectId,
329                                        double      weight,
330                                        Set<Double> instances,
331                                        boolean     enhance,
332                                        String      columns) {   
333    log.info("\tregistering " + objectId + " as " + cls + " with weight " + weight);
334    Vertex soi = g().V().has("lbl",        "SourcesOfInterest").
335                         has("classifier", classifier.name()).
336                         has("cls",        cls).
337                         fold().
338                         coalesce(unfold(), 
339                                  addV("SourcesOfInterest").
340                                  property("lbl",        "SourcesOfInterest").
341                                  property("classifier", classifier.name()  ).
342                                  property("cls",        cls                ).
343                                  property("technology", "HBase"            ).
344                                  property("url",        hbaseUrl()         )).
345                         next();
346    Vertex s = g().V().has("lbl", "source").
347                       has("objectId", objectId).
348                       fold().
349                       coalesce(unfold(), 
350                                addV("source").
351                                property("lbl",      "source").
352                                property("objectId", objectId)).
353                       property("importDate", _now).
354                       next();
355    addEdge(g().V(soi).next(),
356            g().V(s).next(),
357            "deepcontains",
358            new String[]{"weight",    "instances"                                                     },
359            new String[]{"" + weight, instances.toString().replaceFirst("\\[", "").replaceAll("]", "")},
360            true);
361    if (enhance) {
362      try {
363        enhanceSource(classifier, s, instances.toString().replaceFirst("\\[", "").replaceAll("]", "").split(","), columns);
364        }
365      catch (LomikelException e) {
366        log.error("Cannot enhance source", e);
367        }
368      }
369    commit(); // TBD: not needed if enhancing
370    }
371        
372  /** Expand tree under all <em>SourcesOfInterest</em> with alerts
373    * filled with requested HBase columns.
374    * @param classifier The {@link Classifier} to be used.
375    * @param columns    The HBase columns to be filled into alerts. May be <tt>null</tt>.
376    * @throws LomikelException If anything goes wrong. */
377  public void enhanceSourcesOfInterest(Classifiers classifier,
378                                       String      columns) throws LomikelException {
379    log.info("Expanding all SourcesOfInterest and enhancing them with " + columns);
380    for (Object soi : g().V().has("lbl",        "SourcesOfInterest").
381                              has("classifier", classifier.name()  ).
382                              values("cls"                         ).
383                              toSet()) {
384      enhanceSourcesOfInterest(classifier, soi.toString().trim(), columns);
385      }
386    }
387
388  /** Expand tree under <em>SourcesOfInterest</em> with alerts
389    * filled with requested HBase columns.
390    * @param classifier The {@link Classifier} to be used.
391    * @param cls        The type (class) of <em>SourcesOfInterest</em>.
392    * @param columns    The HBase columns to be filled into alerts. May be <tt>null</tt>.
393    * @throws LomikelException If anything goes wrong. */
394  public void enhanceSourcesOfInterest(Classifiers classifier,
395                                       String      cls,
396                                       String      columns) throws LomikelException {
397    log.info("Expanding " + cls + " SourcesOfInterest and enhancing them with " + columns);
398    Vertex soi = g().V().has("lbl",        "SourcesOfInterest").
399                         has("classifier", classifier.name()  ).
400                         has("cls",        cls                ).
401                         next();
402    fhclient(soi.property("url").value().toString());
403    Iterator<Edge> containsIt = soi.edges(Direction.OUT);
404    Edge contains;
405    Vertex source;
406    String objectId;
407    String[] jds;
408    while (containsIt.hasNext()) {
409      contains = containsIt.next();
410      source = contains.inVertex();
411      objectId = source.property("objectId").value().toString();
412      jds = contains.property("instances").value().toString().replaceFirst("\\[", "").replaceAll("]", "").split(",");
413      enhanceSource(classifier, source, jds, columns);
414      }
415    }
416 
417  /** Expand tree under <em>source</em> with alerts
418    * filled with requested HBase columns. It also assembles
419    * related AlertsOfInterest.
420    * @param classifier The {@link Classifier} to be used.
421    * @param source     The source.
422    * @param jds        The <em>jd</em> of related <em>Alerts</em>.
423    * @param columns    The HBase columns to be filled into alerts. May be <tt>null</tt>.
424    * @throws LomikelException If anything goes wrong. */
425  public void enhanceSource(Classifiers classifier,
426                            Vertex      source,
427                            String[]    jds,
428                            String      columns) throws LomikelException {
429    String objectId = source.property("objectId").value().toString();
430    int n = 0;
431    String key;    
432    Vertex alert;
433    List<Map<String, String>> results;
434    for (String jd : jds) {
435      n++;
436      key = objectId + "_" + jd.trim();
437      alert = g().V().has("lbl",      "alert").
438                      has("objectId", objectId).
439                      has("jd",       jd).
440                      fold().
441                      coalesce(unfold(), 
442                               addV("alert").
443                               property("lbl",     "alert"  ).
444                               property("objectId", objectId).
445                               property("jd",       jd      )).
446                      property("importDate", _now).
447                      next();
448      if (columns != null) {
449        results = fhclient().results2List(fhclient().scan(key,
450                                                          null,
451                                                          columns,
452                                                          0,
453                                                          false,
454                                                          false));        
455        for (Map<String, String> result : results) {
456          for (Map.Entry<String, String> entry : result.entrySet()) {
457            if (!entry.getKey().split(":")[0].equals("key")) {
458              try {
459                alert.property(entry.getKey().split(":")[1], entry.getValue());
460                }
461              catch (SchemaViolationException e) {
462                log.error("Cannot enhance " + objectId + "_" + jd + ": " + entry.getKey() + " => " + entry.getValue() + "\n\t" + e.getMessage());
463                }
464              }
465            }
466          }
467        }
468      addEdge(source, alert, "sends");
469      assembleAlertsOfInterest(classifier, alert);
470      }
471    commit();
472    log.info("\t\t" + n + " alerts added");
473    }
474   
475  /** Clean tree under <em>SourcesOfInterest</em>.
476    * Drop alerts. Alerts are dropped even if they have other
477    * {@link Edge}s.
478    * @param classifier The {@link Classifier} to be used.
479    * @param cls        The type (class) of <em>SourcesOfInterest</em>.
480    * @throws LomikelException If anything goes wrong. */
481  public void cleanSourcesOfInterest(Classifiers classifier,
482                                     String      cls) throws LomikelException {
483    log.info("Cleaning " + cls + " SourcesOfInterest");
484    g().V().has("lbl",        "SourcesOfInterest").
485            has("classifier", classifier.name()  ).
486            has("cls",        cls                ).
487            out().
488            out().
489            drop().
490            iterate();
491    commit();
492    }
493        
494  /** Assemble AlertsOfInterest from all existing alerts. 
495    * @param classifier The {@link Classifier} to be used. */ 
496  public void assembleAlertsOfInterest(Classifiers classifier) {
497    log.info("Assembling AlertsOfInterest");
498    GraphTraversal<Vertex, Vertex> alertT = g().V().has("lbl", "alert");
499    Vertex alert;
500    while (alertT.hasNext()) {
501      alert = alertT.next();
502      assembleAlertsOfInterest(classifier, alert);
503      }
504    }
505    
506  /** Assemble AlertsOfInterest from an alert.
507    * @param classifier The {@link Classifier} to be used. 
508    * @param alert      The existing alert. */
509  public void assembleAlertsOfInterest(Classifiers classifier,
510                                       Vertex      alert) {
511    String jd = alert.property("jd").value().toString();
512    Vertex source = alert.edges(Direction.IN).next().outVertex(); // BUG: should check lbl == source
513    String objectId = source.property("objectId").value().toString();
514    Iterator<Edge> containsIt =  source.edges(Direction.IN); // BUG: should check lbl == SoI
515    String cls = null;
516    Edge contains;
517    String instances = "";
518    String hbaseUrl = "";
519    String key;
520    Vertex aoi;
521    // loop over all Edges to SoI
522    while (containsIt.hasNext()) {
523      contains = containsIt.next();
524      instances = contains.property("instances").value().toString();
525      // BUG: jd should not be compared as strings
526      // if alert jd presend in this Soi Edge => create AoI and connect it to alert
527      if (instances.contains(jd)) { // just one SourceOfInterest contains each alert
528        cls      = contains.outVertex().property("cls").value().toString();
529        hbaseUrl = contains.outVertex().property("url").value().toString();
530        key = objectId + "_" + jd;
531        aoi = g().V().has("lbl",        "AlertsOfInterest").
532                      has("classifier", classifier.name() ).
533                      has("cls",        cls               ).
534                      fold().
535                      coalesce(unfold(), 
536                               addV("AlertsOfInterest").
537                               property("lbl",        "AlertsOfInterest").
538                               property("classifier", classifier.name() ).
539                               property("cls",        cls               ).
540                               property("technology", "HBase"           ).
541                               property("url",        hbaseUrl          )).
542                      next();
543        addEdge(g().V(aoi).next(),
544                g().V(alert).next(),
545                "contains",
546                new String[]{},
547                new String[]{},
548                true);
549        }
550      }
551    commit();
552    }
553        
554  /** Generate <em>overlaps</em> Edges between <em>AlertsOfInterest</em> and <em>SourcesOfInterest</em>.
555    * Possibly between two {@link Classifier}s.
556    * @param classifiers The {@link Classifier}s to be used. */
557  public void generateCorrelations(Classifiers... classifiers) {
558    String[] classifierNames = Arrays.stream(classifiers).map(c -> c.name()).toArray(String[]::new);
559    log.info("Generating correlations for Sources and Alerts of Interest for " + Arrays.toString(classifierNames));
560    for (String classifierName : classifierNames) {
561      // Clean all correlations 
562      g().V().has("lbl", "AlertsOfInterest" ).has("classifier", classifierName).bothE().has("lbl", "overlaps").drop().iterate();
563      g().V().has("lbl", "SourcesOfInterest").has("classifier", classifierName).bothE().has("lbl", "overlaps").drop().iterate();
564      // Remove wrong SoI, AoI
565      g().V().has("lbl", "AlertsOfInterest" ).has("classifier", classifierName).not(has("cls")).drop().iterate();
566      g().V().has("lbl", "SourcesOfInterest").has("classifier", classifierName).not(has("cls")).drop().iterate();
567      }
568    commit();
569    // Accumulate correlations and sizes
570    Map<String, Double>               weights0 = new HashMap<>(); // cls -> weight (for one source)
571    Map<Pair<String, String>, Double> corrS    = new HashMap<>(); // [cls1, cls2] -> weight (for all sources between SoI-SoI)
572    Map<Pair<String, String>, Double> corrA    = new HashMap<>(); // [cls1, cls2] -> weight (for all sources between AoI-AoI)
573    Map<String, Double>               sizeS    = new HashMap<>(); // cls -> total (for all sources of SoI)
574    Map<String, Double>               sizeA    = new HashMap<>(); // cls -> total (for all sources of AoI)
575    SortedSet<String>                 types0   = new TreeSet<>(); // [cls] (for one source)
576    SortedSet<String>                 types    = new TreeSet<>(); // [cls] (for all sources)
577    Vertex source;
578    Iterator<Edge> deepcontainsIt;
579    Edge deepcontains;
580    double weight;
581    double weight1;
582    double weight2;
583    double cor;
584    Vertex soi;
585    Vertex soi1;
586    Vertex soi2;
587    Vertex aoi1;
588    Vertex aoi2;
589    String cls;
590    Pair<String, String> rel;
591    // Loop over sources and accumulated weights to each source
592    GraphTraversal<Vertex, Vertex> sourceT = g().V().has("lbl", "source");
593    while (sourceT.hasNext()) {
594      weights0.clear();
595      types0.clear();
596      source = sourceT.next();
597      deepcontainsIt = source.edges(Direction.IN);
598      // Get all weights to this source
599      while (deepcontainsIt.hasNext()) {
600        deepcontains = deepcontainsIt.next();
601        weight = Double.parseDouble(deepcontains.property("weight").value().toString());
602        soi1 = deepcontains.outVertex();
603        cls = soi1.property("cls").value().toString();
604        types0.add(cls);
605        types.add(cls);
606        weights0.put(cls, weight);
607        }
608      // Double loop over accumulated weights and fill weights between SoIs/AoIs
609      for (String cls1 : types0) {
610        weight1 = weights0.get(cls1);
611        for (String cls2 : types0) {
612          weight2 = weights0.get(cls2);
613          rel = Pair.of(cls1, cls2);
614          // SoI-SoI
615          if (!corrS.containsKey(rel)) {
616            corrS.put(rel, 0.0);
617            }
618          cor = corrS.get(rel);
619          corrS.put(rel, cor + 1.0);
620          // AoI-AoI
621          if (!corrA.containsKey(rel)) {
622            corrA.put(rel, 0.0);
623            }
624          cor = corrA.get(rel);
625          corrA.put(rel, cor + weight2);
626          }
627        }
628      }
629    // Fill total sizes
630    double sizeS0;
631    double sizeA0;
632    for (String cls1 : types) {
633      sizeS0 = 0.0;
634      sizeA0 = 0.0;
635      for (String cls2 : types) {
636        if (corrS.containsKey(Pair.of(cls1, cls2))) {
637          sizeS0 += corrS.get(Pair.of(cls1, cls2));
638          }
639        if (corrA.containsKey(Pair.of(cls1, cls2))) {
640          sizeA0 += corrA.get(Pair.of(cls1, cls2));
641          }
642        }
643      sizeS.put(cls1, sizeS0);
644      sizeA.put(cls1, sizeA0);
645      }
646    // Create overlaps
647    int ns = 0;
648    int na = 0;
649    // Loop over SoI and create AoI
650    String hbaseUrl;
651    String classifierName;
652    GraphTraversal<Vertex, Vertex> soiT = g().V().has("lbl", "SourcesOfInterest");
653    while (soiT.hasNext()) {
654      soi = soiT.next();
655      hbaseUrl = soi.property("url").value().toString();
656      classifierName = soi.property("classifier").value().toString();
657      cls = soi.property("cls").value().toString();
658      g().V().has("lbl",        "AlertsOfInterest").
659              has("classifier", classifierName    ).
660              has("cls",        cls               ).
661              fold().
662              coalesce(unfold(), 
663                       addV("AlertsOfInterest").
664                       property("lbl",        "AlertsOfInterest").
665                       property("classifier", classifierName    ).
666                       property("cls",        cls               ).
667                       property("technology", "HBase"           ).
668                       property("url",        hbaseUrl          )).
669              iterate();
670      }
671    // Double-loop over SoI and create overlaps Edge SoI-SoI if non empty 
672    for (String cls1 : types) {
673      try {
674        soi1 = g().V().has("lbl",        "SourcesOfInterest"    ).
675                       has("classifier", within(classifierNames)).
676                       has("cls",        cls1                   ).
677                       next();
678        for (String cls2 : types) {
679          if (corrS.containsKey(Pair.of(cls1, cls2))) {
680            try {
681              soi2 = g().V().has("lbl",        "SourcesOfInterest"    ).
682                             has("classifier", within(classifierNames)).
683                             has("cls",        cls2                   ).
684                             next();
685              addEdge(g().V(soi1).next(),
686                      g().V(soi2).next(),
687                      "overlaps",
688                      new String[]{"intersection",                
689                                   "sizeIn",            
690                                   "sizeOut"},
691                      new Double[]{corrS.get(Pair.of(cls1, cls2)),
692                                   sizeS.get(cls1),
693                                   sizeS.get(cls2)},
694                      true);
695              ns++;
696              }
697            catch (NoSuchElementException e) {
698              log.debug("SoI for " + cls2 + " doesn't exist");
699              }          
700            }  
701          }
702        }
703      catch (NoSuchElementException e) {
704        log.debug("SoI for " + cls1 + " doesn't exist");
705        }          
706      }
707    // Double-loop over AoI and create overlaps Edge AoI-AoI if non empty 
708    for (String cls1 : types) {
709      try {
710        aoi1 = g().V().has("lbl",        "AlertsOfInterest"     ).
711                       has("classifier", within(classifierNames)). 
712                       has("cls",        cls1                   ).
713                       next();
714        for (String cls2 : types) {
715          if (corrA.containsKey(Pair.of(cls1, cls2))) {
716            try {
717              aoi2 = g().V().has("lbl",        "AlertsOfInterest"     ).
718                             has("classifier", within(classifierNames)).
719                             has("cls",        cls2                   ).
720                             next();
721              addEdge(g().V(aoi1).next(),
722                      g().V(aoi2).next(),
723                      "overlaps",
724                      new String[]{"intersection",                
725                                   "sizeIn",            
726                                   "sizeOut"},
727                      new Double[]{corrA.get(Pair.of(cls1, cls2)),
728                                   sizeA.get(cls1),
729                                   sizeA.get(cls2)},
730                      true);
731              na++;
732              }
733            catch (NoSuchElementException e) {
734              log.debug("AoI for " + cls2 + " doesn't exist");
735              }          
736            }  
737          }
738        }
739      catch (NoSuchElementException e) {
740        log.debug("SoI for " + cls1 + " doesn't exist");
741        }          
742      }
743    commit();
744    log.info("" + ns + ", " + na + " source-source and source-alert correlations generated");
745    }
746        
747  /** Create a new {@link FinkHBaseClient}. Singleton when url unchanged.
748    * @param hbaseUrl The HBase url as <tt>ip:port:table[:schema]</tt>.
749    * @return          The corresponding {@link FinkHBaseClient}, created and initialised if needed.
750    * @throws LomikelException If cannot be created. */
751  public FinkHBaseClient fhclient(String hbaseUrl) throws LomikelException {
752    if (hbaseUrl == null || hbaseUrl.equals(_fhclientUrl)) {
753      return _fhclient;
754      }
755    _fhclientUrl = hbaseUrl;
756    String[] url = hbaseUrl.split(":");
757    String ip     = url[0];
758    String port   = url[1];
759    String table  = url[2];
760    String schema = "";
761    if (url.length >= 4) {
762      schema = url[3];
763      }
764    _fhclient = new FinkHBaseClient(ip, port);
765    _fhclient.connect(table, schema);
766    return _fhclient;
767    }
768    
769  /** Get existing {@link FinkHBaseClient}.
770    * @return The corresponding {@link FinkHBaseClient}.
771    * @throws LomikelException If not yet created. */
772  public FinkHBaseClient fhclient() throws LomikelException {
773    if (_fhclient == null) {
774      throw new LomikelException("FinkHBaseClient not initialised");
775      }
776    return _fhclient;
777    }
778    
779  /** Give HBase url.
780    * @return The HBase url as <tt>ip:port:table[:schema]</tt>. */
781  public String hbaseUrl() {
782    return _fhclientUrl;
783    }
784    
785  private FinkHBaseClient _fhclient;
786  
787  private String _fhclientUrl;
788   
789  private String _now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString();
790 
791  private static String FINK_OBJECTS_WS = "https://api.fink-portal.org/api/v1/objects";
792  private static String FINK_LATESTS_WS = "https://api.fink-portal.org/api/v1/latests";
793  private static String FINK_ANOMALY_WS = "https://api.fink-portal.org/api/v1/anomaly";
794
795  /** Logging . */
796  private static Logger log = LogManager.getLogger(FinkGremlinRecipies.class);
797
798  }