001package com.astrolabsoftware.FinkBrowser.Januser;
002
003import com.Lomikel.Utils.SmallHttpClient;
004import com.Lomikel.Utils.NotifierURL;
005import com.Lomikel.Utils.MapUtil;
006import com.Lomikel.Utils.Pair;
007import com.Lomikel.Utils.LomikelException;
008import com.Lomikel.HBaser.HBaseClient;
009import com.Lomikel.Januser.GremlinRecipies;
010import com.Lomikel.Januser.ModifyingGremlinClient;
011import com.astrolabsoftware.FinkBrowser.HBaser.FinkHBaseClient;
012import com.astrolabsoftware.FinkBrowser.FinkPortalClient.FPC;
013
014// Tinker Pop
015import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
016import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
017import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
018import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.otherV;
019import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.V;
020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.fold;
021import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.has;
022import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.not;
023import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;
024import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
025import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.repeat;
026import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.values;
027import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count;
028import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV;
029import static org.apache.tinkerpop.gremlin.process.traversal.P.within;
030import org.apache.tinkerpop.gremlin.structure.Edge;
031import org.apache.tinkerpop.gremlin.structure.Vertex;
032import org.apache.tinkerpop.gremlin.structure.Property;
033import org.apache.tinkerpop.gremlin.structure.VertexProperty;
034import org.apache.tinkerpop.gremlin.structure.Direction;
035import org.apache.tinkerpop.gremlin.structure.Graph;
036
037// Janus Graph
038import org.janusgraph.core.SchemaViolationException;
039import org.janusgraph.graphdb.vertices.StandardVertex;
040import org.janusgraph.graphdb.database.StandardJanusGraph;
041
042// HBase
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Get;
045
046// org.json
047import org.json.JSONArray;
048import org.json.JSONObject;
049
050// Java Mail
051import javax.mail.MessagingException;
052
053// Java
054import java.util.Arrays;
055import java.util.stream.Collectors;
056import java.util.Iterator;
057import java.util.ArrayList;
058import java.util.List;
059import java.util.Map;
060import java.util.HashMap;
061import java.util.LinkedHashMap;
062import java.util.TreeMap;
063import java.util.SortedSet;
064import java.util.TreeSet;
065import java.util.Set;
066import java.util.HashSet;
067import java.util.Iterator;
068import java.util.Optional;
069import java.util.Calendar;
070import java.util.Date;
071import java.text.SimpleDateFormat;
072import java.util.NoSuchElementException;
073
074// Log4J
075import org.apache.logging.log4j.Logger;
076import org.apache.logging.log4j.LogManager;
077
078/** <code>FinkGremlinRecipies</code> provides various recipies to handle
079  * and modify Gremlin Graphs for Fink.
080  * @opt attributes
081  * @opt operations
082  * @opt types
083  * @opt visibility
084  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
085// TBD: check precodition for methods, wgich doesn't work with 'client' creation
086public class FinkGremlinRecipies extends GremlinRecipies {
087    
088  /** Create and attach to {@link GraphTraversalSource}.
089    * @param g The attached {@link GraphTraversalSource}. */
090  public FinkGremlinRecipies(GraphTraversalSource g) {
091    super(g);
092    }
093    
094  /** Create and attach to {@link ModifyingGremlinClient}.
095    * @param client The attached  {@link ModifyingGremlinClient}. */
096  public FinkGremlinRecipies(ModifyingGremlinClient client) {
097    super(client);
098    }
099    
100  /** Execute full chain of new sources correlations analyses.
101    * @param classifiers The {@link Classifier}s to be used.
102    *                        They can contain the {@link Classifier} flavor after <em>=</em> symbol.
103    * @param filter          The HBase evaluation formula to be applied.
104    *                        Ignored if <tt>clss</tt> are specified.
105    * @param hbaseUrl        The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
106    * @param nLimit          The maximal number of alerts getting from HBase or Fink Portal.
107    *                        <tt>0</tt> means no limit.
108    * @param timeLimit       How far into the past the search should search (in minutes).
109    * @param clss            An array of <em>classes</em> taken from {@link FPC},
110    *                        if contains <tt>Anomaly</tt>, get anomalies from {@link FPC},                  
111    *                        if <tt>null</tt>, analyse <em>sources</em> from HBase database.
112    * @throws LomikelException If anything fails. */
113  public void processSoI(Classifier[] classifiers,
114                         String       filter,
115                         String       hbaseUrl,
116                         int          nLimit,
117                         int          timeLimit,
118                         String[]     clss) throws LomikelException {
119    fillSoI(classifiers, filter, hbaseUrl, nLimit, timeLimit, clss);
120    generateCorrelations(classifiers);
121    }
122        
123  /** Fill graph with <em>SoI</em>.
124    * @param classifiers The {@link Classifier}s to be used.
125    * @param filter      The HBase evaluation formula to be applied.
126    *                    Ignored if <tt>clss</tt> are specified.
127    * @param hbaseUrl    The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
128    * @param nLimit      The maximal number of alerts getting from HBase or Fink Portal.
129    *                    <tt>0</tt> means no limit.
130    * @param timeLimit   How far into the past the search should search (in minutes).
131    * @param clss        An array of <em>classes</em> taken from {@link FPC},
132    *                    if contains <tt>Anomaly</tt>, get anomalies from {@link FPC},                  
133    *                    if <tt>null</tt>, analyse <em>sources</em> from HBase database.
134
135    * @throws LomikelException If anything fails. */
136  public void fillSoI(Classifier[] classifiers,
137                      String       filter,
138                      String       hbaseUrl,
139                      int          nLimit,
140                      int          timeLimit,
141                      String[]     clss) throws LomikelException {
142    String clssDesc = "";
143    if (clss != null) {
144      clssDesc = "of " + Arrays.toString(clss);
145      }
146    log.info("Filling SoI " + clssDesc + " using " + Arrays.toString(classifiers) + " classifiers, nLimit = " + nLimit + ", timeLimit = " + timeLimit);
147    log.info("Importing from " + hbaseUrl + ":");
148    fhclient(hbaseUrl);
149    Set<String> oids = new HashSet<>();;
150    if (clss == null) { 
151      fhclient().setEvaluation(filter);
152      if (nLimit > 0) {
153        fhclient().setLimit(nLimit);
154        }
155      oids = fhclient().latestsT("i:objectId",
156                                 null,
157                                 timeLimit,
158                                 true);
159      fhclient().setEvaluation(null);
160      }
161    else {
162      Calendar cal;
163      Date d;
164      String sd;
165      JSONArray ja;
166      JSONObject jo;
167      for (String cls : clss) {
168        cal = Calendar.getInstance();
169        cal.add(Calendar.MINUTE, -nLimit);
170        d = cal.getTime();
171        sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d);
172        if (cls.equals("*")) {
173          ja = FPC.anomaly(new JSONObject().put("n",             nLimit).
174                                            put("startdate",     sd).
175                                            put("columns",       "i:objectId").
176                                            put("output-format", "json"));
177          }
178        else {
179          ja = FPC.latests(new JSONObject().put("n",             nLimit).
180                                            put("class",         cls).
181                                            put("startdate",     sd).
182                                            put("columns",       "i:objectId").
183                                            put("output-format", "json"));
184          }
185        for (int i = 0; i < ja.length(); i++) {
186          jo = ja.getJSONObject(i);
187          oids.add(jo.getString("i:objectId"));
188          }
189        log.info("*** " + cls + "[" + ja.length() + "]:");
190        }
191      }
192    classifySources(classifiers, oids, hbaseUrl);
193    }
194    
195  /** Classify <em>source</em> .
196    * @param classifiers The {@link Classifier}s to be used.
197    * @param oids        The {@link Set} of <tt>objectId</tt>s of source to be added.
198    * @param hbaseUrl    The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
199    * @throws LomikelException If anything fails. */
200  public void classifySources(Classifier[] classifiers,
201                              Set<String>  oids,
202                              String       hbaseUrl) throws LomikelException {
203    int size = oids.size();
204    int n = 0;
205    long dt;
206    double freq;
207    long startTime = System.currentTimeMillis();
208    // loop over all sources
209    for (String oid : oids) {
210      log.info(oid + " (" + n + " of " + size + "):");
211      for (Classifier classifier : classifiers) {
212        try {
213          classifySource(classifier, oid, hbaseUrl);
214          }
215        catch (LomikelException e) {
216          log.error("Cannot get classification for " + oid, e);
217          }
218        }
219      n++;
220      dt = (System.currentTimeMillis() - startTime) / 1000;
221      freq = (double)n / (double)dt;
222      log.info("\t\twith " + String.format("%.2f", freq) + " Hz");
223      }
224    }
225    
226   /** Classify <em>source</em>.
227    * @param classifier The {@link Classifier} to be used.
228    * @param objectId   The <tt>objectId</tt> of source to be added.
229    * @param hbaseUrl   The url of HBase with alerts as <tt>ip:port:table:schema</tt>.
230    * @throws LomikelException If anything fails. */
231  public void classifySource(Classifier classifier,
232                             String     objectId,
233                             String     hbaseUrl) throws LomikelException {  
234    fhclient(hbaseUrl);
235    classifySource(classifier, objectId);
236    }
237   
238  /** Classify <em>source</em>.
239    * @param classifier The {@link Classifier} to be used.
240    * @param objectId   The <tt>objectId</tt> of source to be added.
241    * @throws LomikelException If anything fails. */
242  public void classifySource(Classifier classifier,
243                             String     objectId) throws LomikelException {  
244    if (g().V().has("lbl", "source").has("objectId", objectId).hasNext()) {
245      Vertex v1 = g().V().has("lbl", "source").has("objectId", objectId).next();
246      List<Vertex> v2s = g().V(v1).in().
247                                   has("lbl",        "SoI").
248                                   has("classifier", classifier.name()).
249                                   has("flavor",     classifier.flavor()).
250                                   toList();
251      Iterator<Edge> edges;
252      for (Vertex v2 : v2s) {
253        edges = g().V(v1).inE().
254                          has("lbl", "deepcontains").
255                          where(otherV().
256                          is(v2)).
257                          toStream().
258                          iterator();
259        while (edges.hasNext()) {
260          edges.next().remove(); 
261          }
262        }        
263      // will be commited in registration
264      }
265    classifier.classify(this, objectId);
266    }
267       
268  /** Register  <em>source</em> in <em>SoI</em>.
269    * @param classifier The {@link Classifier} to be used.
270    * @param cls        The type (class) of <em>SoI</em> {@link Vertex}.
271    *                   It will be created if not yet exists.
272    * @param objectId   The objectId of the new <em>Source</em> {@link Vertex}.
273    *                   It will be created if not yet exists.
274    * @param weight     The weight of the connection.
275    *                   Usualy the number of <em>Alerts</em> of this type. 
276    * @param instanceS  The <em>jd</em> of related <em>Alerts</em> as strings separated by comma.
277    *                   Potential square brackets are removed.
278    *                   May be <tt>null</tt> or empty. */
279  public void registerSoI(Classifier classifier,
280                          String     cls,
281                          String     objectId,
282                          double     weight,
283                          String     instancesS,
284                          String     weightsS) {   
285    List<String> instances = new ArrayList<>();
286    List<Double> weights   = new ArrayList<>();
287    if (instancesS != null && !instancesS.trim().equals("")) {
288      for (String instance : instancesS.replaceAll("\\[", "").replaceAll("]", "").split(",")) {
289        instances.add(instance);
290        }
291      }
292    if (weightsS != null && !weightsS.trim().equals("")) {
293      for (String weighs : weightsS.replaceAll("\\[", "").replaceAll("]", "").split(",")) {
294        weights.add(Double.valueOf(weight));
295        }
296      }
297    registerSoI(classifier, cls, objectId, weight, instances, weights);
298    }
299    
300  /** Register <em>source</em> in <em>SoI</em>.
301    * @param classifier The {@link Classifier} to be used.
302    * @param cls        The type (class) of <em>SoI</em> {@link Vertex}.
303    *                   It will be created if not yet exists.
304    * @param objectId   The objectId of the new <em>Source</em> {@link Vertex}.
305    *                   It will be created if not yet exists.
306    * @param weight     The total weight of the connection.
307    *                   Usualy the number of <em>Alerts</em> of this type. 
308    * @param instances  The <em>jd</em> of related <em>Alerts</em>.
309    * @param weights    The weights of related <em>Alerts</em>. */
310  public void registerSoI(Classifier   classifier,
311                          String       cls,
312                          String       objectId,
313                          double       weight,
314                          List<String> instances,
315                          List<Double> weights) {   
316    log.info("\tregistering " + objectId + " as " + classifier + " / " + cls + " with weight " + weight);
317    Vertex soi = g().V().has("lbl",        "SoI"              ).
318                         has("classifier", classifier.name()  ).
319                         has("flavor",     classifier.flavor()).
320                         has("cls",        cls                ).
321                         fold().
322                         coalesce(unfold(), 
323                                  addV("SoI").
324                                  property("lbl",        "SoI").
325                                  property("classifier", classifier.name()  ).
326                                  property("flavor",     classifier.flavor()).
327                                  property("cls",        cls                )).
328                         next();
329    Vertex s = g().V().has("lbl",      "source").
330                       has("objectId", objectId).
331                       fold().
332                       coalesce(unfold(), 
333                                addV("source").
334                                property("lbl",      "source").
335                                property("objectId", objectId)).
336                       property("importDate", _now).
337                       next();
338    addEdge(g().V(soi).next(),
339            g().V(s).next(),
340            "deepcontains",
341            new String[]{"weight",
342                         "instances",
343                         "weights"},
344            new String[]{"" + weight,
345                         instances.toString().replaceFirst("\\[", "").replaceAll("]", ""),
346                         weights.toString().replaceFirst("\\[", "").replaceAll("]", "")},
347            true);
348    commit();
349    }
350   
351  /** Clean tree under <em>SoI</em>.
352    * Drop alerts. Alerts are dropped even if they have other
353    * {@link Edge}s.
354    * @param classifier The {@link Classifier} to be used.
355    * @param cls        The type (class) of <em>SoI</em>.
356    * @throws LomikelException If anything goes wrong. */
357  public void cleanSoI(Classifier classifier,
358                       String     cls) throws LomikelException {
359    log.info("Cleaning " + cls + " SoI");
360    g().V().has("lbl",        "SoI").
361            has("classifier", classifier.name()  ).
362            has("flavor",     classifier.flavor()).
363            has("cls",        cls                ).
364            out().
365            out().
366            drop().
367            iterate();
368    commit();
369    }
370        
371  /** Generate <em>overlaps</em> Edges between <em>SoI</em>.
372    * Possibly between two {@link Classifier}s.
373    * @param classifier The {@link Classifier}s to be used. */
374  public void generateCorrelations(Classifier... classifiers) {
375    log.info("Generating correlations for SoI for " + classifiers);
376    List<String> namesL   = new ArrayList<>();
377    List<String> flavorsL = new ArrayList<>();
378    for (Classifier classifier : classifiers) {
379      namesL.add(  classifier.name()  );
380      flavorsL.add(classifier.flavor());
381      // Clean all correlations 
382      g().V().has("lbl",        "SoI"              ).
383              has("classifier", classifier.name()  ).
384              has("flavor",     classifier.flavor()).
385              bothE().
386              has("lbl", "overlaps").
387              drop().
388              iterate();
389      // Remove wrong SoI
390      g().V().has("lbl",        "SoI"              ).
391              has("classifier", classifier.name()  ).
392              has("flavor",     classifier.flavor()).
393              not(has("cls")).
394              drop().
395              iterate();
396      }
397    String[] names   = namesL.toArray(  String[]::new);
398    String[] flavors = flavorsL.toArray(String[]::new);
399    commit();
400    // Accumulate correlations and sizes
401    Map<String, Double>               weights0 = new HashMap<>(); // cls -> weight (for one source)
402    Map<Pair<String, String>, Double> corrS    = new HashMap<>(); // [cls1, cls2] -> weight (for all sources between SoI-SoI)
403    Map<String, Double>               sizeS    = new HashMap<>(); // cls -> total (for all sources of SoI)
404    SortedSet<String>                 types0   = new TreeSet<>(); // [cls] (for one source)
405    SortedSet<String>                 types    = new TreeSet<>(); // [cls] (for all sources)
406    Vertex source;
407    Iterator<Edge> deepcontainsIt;
408    Edge deepcontains;
409    double weight;
410    double weight1;
411    double weight2;
412    double cor;
413    Vertex soi;
414    Vertex soi1;
415    Vertex soi2;
416    String cls;
417    Pair<String, String> rel;
418    // Loop over sources and accumulated weights to each source
419    GraphTraversal<Vertex, Vertex> sourceT = g().V().has("lbl", "source");
420    while (sourceT.hasNext()) {
421      weights0.clear();
422      types0.clear();
423      source = sourceT.next();
424      deepcontainsIt = source.edges(Direction.IN);
425      // Get all weights to this source
426      while (deepcontainsIt.hasNext()) {
427        deepcontains = deepcontainsIt.next();
428        weight = Double.parseDouble(deepcontains.property("weight").value().toString());
429        soi1 = deepcontains.outVertex();
430        cls = soi1.property("cls").value().toString();
431        types0.add(cls);
432        types.add(cls);
433        weights0.put(cls, weight);
434        }
435      // Double loop over accumulated weights and fill weights between SoIs
436      for (String cls1 : types0) {
437        weight1 = weights0.get(cls1);
438        for (String cls2 : types0) {
439          weight2 = weights0.get(cls2);
440          rel = Pair.of(cls1, cls2);
441          // SoI-SoI
442          if (!corrS.containsKey(rel)) {
443            corrS.put(rel, 0.0);
444            }
445          cor = corrS.get(rel);
446          corrS.put(rel, cor + 1.0);
447          }
448        }
449      }
450    // Fill total sizes
451    double sizeS0;
452    for (String cls1 : types) {
453      sizeS0 = 0.0;
454      for (String cls2 : types) {
455        if (corrS.containsKey(Pair.of(cls1, cls2))) {
456          sizeS0 += corrS.get(Pair.of(cls1, cls2));
457          }
458        }
459      sizeS.put(cls1, sizeS0);
460      }
461    // Create overlaps
462    int ns = 0;
463    // Double-loop over SoI and create overlaps Edge SoI-SoI if non empty 
464    // NOTE: it takes all SoI names and all SoI flavors (even if they are not requested in all combinations)
465    for (String cls1 : types) {
466      try {
467        soi1 = g().V().has("lbl",        "SoI"          ).
468                       has("classifier", within(names)  ).
469                       has("flavor",     within(flavors)).
470                       has("cls",        cls1           ).
471                       next();
472        for (String cls2 : types) {
473          if (corrS.containsKey(Pair.of(cls1, cls2))) {
474            try {
475              soi2 = g().V().has("lbl",        "SoI"          ).
476                             has("classifier", within(names)  ).
477                             has("flavor",     within(flavors)).
478                             has("cls",         cls2          ).
479                             next();
480              addEdge(g().V(soi1).next(),
481                      g().V(soi2).next(),
482                      "overlaps",
483                      new String[]{"intersection",                
484                                   "sizeIn",            
485                                   "sizeOut"},
486                      new Double[]{corrS.get(Pair.of(cls1, cls2)),
487                                   sizeS.get(cls1),
488                                   sizeS.get(cls2)},
489                      true);
490              ns++;
491              }
492            catch (NoSuchElementException e) {
493              log.debug("SoI for " + cls2 + " doesn't exist");
494              }          
495            }  
496          }
497        }
498      catch (NoSuchElementException e) {
499        log.debug("SoI for " + cls1 + " doesn't exist");
500        }          
501      }
502    commit();
503    log.info("" + ns + " source-source correlations generated");
504    }
505        
506  /** Create a new {@link FinkHBaseClient}. Singleton when url unchanged.
507    * @param hbaseUrl The HBase url as <tt>ip:port:table[:schema]</tt>.
508    * @return          The corresponding {@link FinkHBaseClient}, created and initialised if needed.
509    * @throws LomikelException If cannot be created. */
510  public FinkHBaseClient fhclient(String hbaseUrl) throws LomikelException {
511    if (hbaseUrl == null || hbaseUrl.equals(_fhclientUrl)) {
512      return _fhclient;
513      }
514    _fhclientUrl = hbaseUrl;
515    String[] url = hbaseUrl.split(":");
516    String ip     = url[0];
517    String port   = url[1];
518    String table  = url[2];
519    String schema = "";
520    if (url.length >= 4) {
521      schema = url[3];
522      }
523    _fhclient = new FinkHBaseClient(ip, port);
524    _fhclient.connect(table, schema);
525    return _fhclient;
526    }
527    
528  /** Get existing {@link FinkHBaseClient}.
529    * @return The corresponding {@link FinkHBaseClient}.
530    * @throws LomikelException If not yet created. */
531  public FinkHBaseClient fhclient() throws LomikelException {
532    if (_fhclient == null) {
533      throw new LomikelException("FinkHBaseClient not initialised");
534      }
535    return _fhclient;
536    }
537    
538  /** Give HBase url.
539    * @return The HBase url as <tt>ip:port:table[:schema]</tt>. */
540  public String hbaseUrl() {
541    return _fhclientUrl;
542    }
543    
544  private FinkHBaseClient _fhclient;
545  
546  private String _fhclientUrl;
547   
548  private String _now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString();
549 
550  private static String FINK_OBJECTS_WS = "https://api.fink-portal.org/api/v1/objects";
551  private static String FINK_LATESTS_WS = "https://api.fink-portal.org/api/v1/latests";
552  private static String FINK_ANOMALY_WS = "https://api.fink-portal.org/api/v1/anomaly";
553
554  /** Logging . */
555  private static Logger log = LogManager.getLogger(FinkGremlinRecipies.class);
556
557  }