001package com.astrolabsoftware.FinkBrowser.Januser;
002
003import com.Lomikel.HBaser.HBaseClient;
004import com.Lomikel.Utils.LomikelException;
005import com.astrolabsoftware.FinkBrowser.FinkPortalClient.FPC;
006import com.astrolabsoftware.FinkBrowser.HBaser.Clusteriser.ClusterFinder;
007
008// org.json
009import org.json.JSONArray;
010import org.json.JSONObject;
011
012// Java
013import java.util.Arrays;
014import java.util.Set;
015import java.util.TreeSet;
016import java.util.Map;
017import java.util.TreeMap;
018import java.io.IOException;
019
020// Log4J
021import org.apache.logging.log4j.Logger;
022import org.apache.logging.log4j.LogManager;
023
024/** <code>FeaturesClassifier</code> classifies sources according to
025  * HBase <tt>lc_features_*</tt> field.
026  * @opt attributes
027  * @opt operations
028  * @opt types
029  * @opt visibility
030  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
031// BUG: jd should be String or long
032public class FeaturesClassifier implements Classifier {
033  
034  @Override
035  public void classify(FinkGremlinRecipies recipies,
036                       String              oid,
037                       String              hbaseUrl,
038                       boolean             enhance,
039                       String              columns) throws LomikelException {
040    double jd;
041    String cl;
042    Map<String, String> value;
043    String[] featuresS;
044    double[] featuresD;
045    String fg;
046    String fr;
047    Map<String, Set<Double>> classes; // cl -> [jd]
048    Set<Double> jds;
049    String key;
050    Set<Double> val;
051    double weight;
052    Map<String, Map<String, String>> alerts = client(hbaseUrl).scan(null,
053                                                                    "key:key:" + oid + ":prefix",
054                                                                    "i:jd,d:lc_features_g,d:lc_features_r",
055                                                                    0,
056                                                                    0,
057                                                                    false,
058                                                                    false);
059    classes = new TreeMap<>();
060    // get all alerts (jd) and their classses
061    for (Map.Entry<String, Map<String, String>> entry : alerts.entrySet()) {
062      value = entry.getValue();
063      jd = Double.parseDouble(value.get("i:jd"));
064      if (value.containsKey("d:lc_features_g") &&
065          value.containsKey("d:lc_features_r")) {
066        fg = value.get("d:lc_features_g").replaceFirst("\\[", "").replaceAll("]$", "");
067        fr = value.get("d:lc_features_r").replaceFirst("\\[", "").replaceAll("]$", "");
068        featuresS = (fg + "," + fr).replaceAll("null", "0.0").
069                                    replaceAll("NaN", "0.0").
070                                    split(",");
071        featuresD = Arrays.stream(featuresS).
072                           mapToDouble(Double::parseDouble).
073                           toArray();
074        cl = String.valueOf(finder().transformAndPredict(featuresD));                  
075        if (!cl.equals("-1")) {
076          if (classes.containsKey(cl)) {
077            jds = classes.get(cl);
078            jds.add(jd);
079            }
080          else {
081            jds = new TreeSet<Double>();
082            jds.add(jd);
083            classes.put(cl, jds);
084            }
085          }
086        }
087      }
088    for (Map.Entry<String, Set<Double>> cls : classes.entrySet()) {
089      key = "FC-" + cls.getKey();
090      val = cls.getValue();
091      weight = val.size();
092      recipies.registerSourcesOfInterest(Classifiers.FEATURES, key, oid, weight, val, hbaseUrl, enhance, columns);
093      }
094    }
095  
096  /** Give {@link HBaseClient} to current database. Singleton.
097    * @param hbaseUrl The full HBase url <tt>ip:port:table:schema</tt>.
098    * @return         The corresponding {@link HBaseClient}.
099    * @throws LomikelExceltion If {@link HBaseClient} cannot be created. */
100  private HBaseClient client(String hbaseUrl) throws LomikelException {
101    if (_client == null) {
102      String[] hbaseUrlA = hbaseUrl.split(":");
103      if (hbaseUrlA.length < 4) {
104        throw new LomikelException("Cannot create HBase client, hbaseUrl uncomplete: " + hbaseUrl);
105        }
106      _client = new HBaseClient(hbaseUrlA[0], hbaseUrlA[1]);
107      _client.connect(hbaseUrlA[2], hbaseUrlA[3]);
108      }
109    return _client;
110    }
111    
112  /** Give {@link ClusterFinder} to current database. Singleton.
113    * @return The corresponding {@link ClusterFinder}. 
114    * @throws LomikelExceltion If {@link ClusterFinder} cannot be created. */
115  private ClusterFinder finder() throws LomikelException {
116    if (_finder == null) {
117      if (_dirName == null) {
118        _dirName = "/tmp";
119        }
120      try {
121        _finder = new ClusterFinder(_dirName + "/scaler_params.json",
122                                    _dirName + "/pca_params.json",
123                                    _dirName + "/cluster_centers.json");
124        }
125      catch (IOException e) {
126        throw new LomikelException("Cannot create Cluster Finder", e);
127        }
128      }
129    return _finder;
130    }
131    
132  /** Set the directory for model json files
133    * <tt>scaler_params.json, pca_params.json, cluster_centers.json</tt>.
134    * If not set, <tt>/tmp</tt> will be used.
135    * @param dirName The directory for model json files. */
136  public void setModelDirectory(String dirName) {
137    _dirName = dirName;
138    }
139    
140  private static HBaseClient _client;
141  
142  private static ClusterFinder _finder;
143  
144  private static String _dirName;
145
146  /** Logging . */
147  private static Logger log = LogManager.getLogger(FeaturesClassifier.class);
148  
149  }
150           
151