001package com.astrolabsoftware.FinkBrowser.Januser;
002
003import com.Lomikel.Utils.Init;
004import com.Lomikel.Utils.LomikelException;
005import com.Lomikel.HBaser.HBaseClient;
006import com.Lomikel.HBaser.HBaseSchema;
007import com.Lomikel.Januser.JanusClient;
008import com.Lomikel.Januser.GremlinRecipies;
009
010// Tinker Pop
011import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;
012import org.apache.tinkerpop.gremlin.structure.Vertex;
013
014// Janus Graph
015import org.janusgraph.core.JanusGraph;
016import org.janusgraph.core.attribute.Geoshape;
017
018// HBase
019import org.apache.hadoop.hbase.client.Connection;
020import org.apache.hadoop.hbase.util.Bytes;
021import org.apache.hadoop.hbase.client.Result;
022import org.apache.hadoop.hbase.util.Bytes;
023import org.apache.hadoop.hbase.util.Bytes;
024import org.apache.hadoop.hbase.client.Result;
025import org.apache.hadoop.hbase.client.ResultScanner;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.Cell;
028
029// Java
030import java.util.Map;
031import java.util.NavigableMap;
032
033// Log4J
034import org.apache.logging.log4j.Logger;
035import org.apache.logging.log4j.LogManager;
036
037/** <code>StructureCreator</code> generates the network of higher level entities
038  * from the LSST {@link Alert}s.
039  * @opt attributes
040  * @opt operations
041  * @opt types
042  * @opt visibility
043  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
044public class StructureCreator extends JanusClient {
045
046  /** Create JanusGraph structures from the HBase database.
047    * @param args[0]  The operation to perform: <tt>populate</tt>.
048    * @param args[1]  The file with the complete JanusGraph properties.
049    * @param args[2]  The HBase hostname.
050    * @param args[3]  The HBase port.
051    * @param args[4]  The HBase table to replicate in Graph.
052    * @param args[5]  The HBase table schema name.
053    * @param args[6]  The label of newly created Vertexes.
054    * @param args[7]  The row key name.
055    * @param args[8]  The key prefix to limit replication to.
056    * @param args[9]  The key to start search from (in ms), may be blank.
057    * @param args[10] The key to stop search at (in ms), may be blank.
058    * @param args[11] The time start search from, may be 0.
059    * @param args[12] The time stop search at, may be 0.
060    * @param args[13] The maximal number of entries to process (-1 means all entries).
061    * @param args[14] The number of entries to skip (-1 or 0 means no skipping).
062    * @param args[15] The number of events to commit in one step (-1 means commit only at the end).
063    * @param args[16] Whether remove all {@link Vertex}es with the define
064    *                 label before populating or check for each one and only
065    *                 create it if it doesn't exist yet.
066    * @param args[17] Whether check the existence of the vertex before creating it.
067    *                 (Index-based verification is disabled for speed.)
068    * @param args[18] Whether fill all variables or just rowkey and lbl.
069    *                 Overrides partialFill.
070    * @param args[19] List of (coma separated) HBase columns to fill besides the default column.
071    * @param args[20] List of (coma separated) geopoint property name and HBase columns representing lat and long in deg.
072    * @throws LomikelException If anything goes wrong. */
073  public static void main(String[] args) throws Exception {
074    Init.init();
075    if (args[0].trim().equals("populate")) {
076      String failedKey = args[9];
077      do {
078        failedKey = new StructureCreator(args[1]).populateGraphFromHBase(                args[2],
079                                                                         Integer.valueOf(args[3]),
080                                                                                         args[4],
081                                                                                         args[5],
082                                                                                         args[6],
083                                                                                         args[7],
084                                                                                         args[8],
085                                                                                         failedKey,
086                                                                                         args[10],
087                                                                         Long.valueOf(   args[11]),
088                                                                         Long.valueOf(   args[12]),
089                                                                         Integer.valueOf(args[13]),
090                                                                         Integer.valueOf(args[14]),
091                                                                         Integer.valueOf(args[15]),
092                                                                                         args[16].equals("true"),
093                                                                                         args[17].equals("true"),
094                                                                                         args[18].equals("true"),
095                                                                                         args[19],
096                                                                                         args[20]);
097        }
098      while (!failedKey.equals(""));
099      }                             
100    else {
101      System.err.println("Unknown function " + args[0] + ", try or populate");
102      System.exit(-1);
103      }
104    } 
105       
106  /** Create with connection parameters.
107    * @param hostname The HBase hostname.
108    * @param port     The HBase port.
109    * @param table    The HBase table. */
110  public StructureCreator(String hostname,
111                          int    port,
112                          String table) {
113    super(hostname, port, table, false);
114    }
115   
116  /** Create with connection parameters.
117    * @param hostname The HBase hostname.
118    * @param port     The HBase port.
119    * @param table    The HBase table.
120    * @param batch    Whether open graph for batch loading. */
121  public StructureCreator(String  hostname,
122                          int     port,
123                          String  table,
124                          boolean batch) {
125    super(hostname, port, table, batch);
126    }
127    
128  /** Create with connection properties file.
129    * @param properties The file with the complete properties. */
130  public StructureCreator(String properties) {
131    super(properties);
132    }
133      
134  /** Populate JanusGraph from HBase table.
135    * @param hbaseHost       The HBase hostname.
136    * @param hbasePort       The HBase port.
137    * @param hbaseTable      The HBase table to replicate in Graph.
138    * @param tableSchema     The HBase table schema name.
139    * @param label           The label of newly created Vertexes.
140    * @param rowkey          The row key name.
141    * @param keyPrefixSearch The key prefix to limit replication to.
142    * @param keyStart        The key to start search from, may be blank.
143    * @param keyStop         The key to stop search at, may be blank.
144    * @param start           The time start search from (in ms), may be 0.
145    * @param stop            The time stop search at (in ms), may be 0.
146    * @param limit           The maximal number of entries to process (-1 means all entries).
147    * @param skip            The number of entries to skip (-1 or 0 means no skipping).
148    * @param commitLimit     The number of events to commit in one step (-1 means commit only at the end).
149    * @param reset           Whether remove all {@link Vertex}es with the define
150    *                        label before populating or check for each one and only
151    *                        create it if it doesn't exist yet.
152    * @param getOrCreate     Whether check the existence of the vertex before creating it.
153    *                        (Index-based verification is disabled for speed.)
154    * @param fullFill        Whether fill all variables or just rowkey and lbl.
155    *                        Overrides partialFill.
156    * @param partialFill     List of (coma separated) HBase columns to fill besides the default column.
157    * @param geopoint        List of (coma separated) geopoint property name and HBase columns representing lat and long in deg.
158    * @return                Blank if the population has been executed correctly, the last
159    *                        sucessfull key otherwise.
160    * @throws LomikelException If anything goes wrong. */
161  // TBD: allow replacing, updating
162  // TBD: read only rowkey if fullFill = false
163  // TBD: handle binary columns
164  public String populateGraphFromHBase(String  hbaseHost,
165                                       int     hbasePort,
166                                       String  hbaseTable,
167                                       String  tableSchema,
168                                       String  label,
169                                       String  rowkey,
170                                       String  keyPrefixSearch,
171                                       String  keyStart,
172                                       String  keyStop,
173                                       long    start,
174                                       long    stop,
175                                       int     limit,
176                                       int     skip,
177                                       int     commitLimit,
178                                       boolean reset,
179                                       boolean getOrCreate,
180                                       boolean fullFill,
181                                       String  partialFill,
182                                       String  geopoint) throws LomikelException {
183    log.info("Populating Graph from " + hbaseTable + "(" + tableSchema + ")@" + hbaseHost + ":" + hbasePort);
184    log.info("\tvertex labels: " + label);
185    log.info("\t" + rowkey + " starting with " + keyPrefixSearch);
186    log.info("\tlimit/skip/commitLimit: " + limit + "/" + skip + "/" + commitLimit);
187    if (reset) {
188      log.info("\tcleaning before population");
189      }
190    if (getOrCreate) {
191      log.info("\tadd vertex only if non-existent");
192      }
193    else {
194      log.info("\tadd vertex even if it already exists");
195      }
196    if (fullFill) {
197      log.info("\tfilling all variables");
198      }
199    else if (partialFill != null && !partialFill.trim().equals("")) {
200      log.info("\tfilling " + rowkey + ",lbl," + partialFill.trim());
201      }
202    else {
203      log.info("\tfilling only " + rowkey + " and lbl");
204      }
205    if (geopoint != null && !geopoint.trim().equals("")) {
206      log.info("\tfilling also geopoint " + geopoint.trim());
207      }
208    if (!keyStart.equals("")) {
209      log.info("Staring at " + keyStart);
210      }
211    if (!keyStop.equals("")) {
212      log.info("Stopping at " + keyStop);
213      }
214    if (start > 0) {
215      log.info("Staring at " + start + "ms");
216      }
217    if (stop > 0) {
218      log.info("Stopping at " + stop + "ms");
219      }
220    GremlinRecipies gr = new GremlinRecipies(this);
221    timerStart();
222    if (reset) {                        
223      log.info("Cleaning Graph, vertexes: " + label);
224      g().V().has("lbl", label).drop().iterate();
225      }
226    commit();
227    log.info("Connection to HBase table");
228    HBaseClient hc = new HBaseClient(hbaseHost, hbasePort);
229    hc.connect(hbaseTable, tableSchema); 
230    hc.setLimit(0);
231    String searchS = "key:key:" + keyPrefixSearch + ":prefix";
232    if (!keyStart.equals("")) {
233      searchS += ",key:startKey:" + keyStart;
234      }
235    if (!keyStop.equals("")) {
236      searchS += ",key:stopKey:" + keyStop;
237      }
238    hc.scan(null, searchS, "*", start, stop, false, false);
239    ResultScanner rs = hc.resultScanner();
240    HBaseSchema schema = hc.schema();
241    log.info("Populating Graph");
242    Vertex v;
243    String key;
244    String lastInsertedKey = null;
245    String failedKey       = null;
246    String family;
247    String field;
248    String column;
249    String value;
250    byte[] b;
251    String[] cc;
252    byte[] lat;
253    byte[] lon;
254    String[] latclm;
255    String[] lonclm;
256    int i = 0;
257    //for (Result r : rs) {
258    //  i++;
259    //  key = Bytes.toString(r.getRow());
260    //  for (Cell cell : r.listCells()) {
261    //    family = Bytes.toString(cell.getFamilyArray(),    cell.getFamilyOffset(),    cell.getFamilyLength());
262    //    column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
263    //    value  = Bytes.toString(cell.getValueArray(),     cell.getValueOffset(),     cell.getValueLength());
264    //    }
265    //  timer(label + "s created", i, 100, commitLimit);
266    //  } 
267    NavigableMap<byte[], NavigableMap<byte[], byte[]>>   resultMap;
268    try {
269      for (Result r : rs) {
270        resultMap = r.getNoVersionMap();
271        key = Bytes.toString(r.getRow());
272        if (!key.startsWith("schema")) {
273          if (failedKey == null) {
274            failedKey = key;
275            }
276          i++;
277          if (i <= skip) {
278            continue;
279            }
280          if (limit > 0 && i > limit) {
281            break;
282            }
283          if (getOrCreate) {
284            v = gr.getOrCreate(label, rowkey, key).next();
285            }
286          else {
287            v = g().addV(label).property(rowkey, key).property("lbl", label).next();
288            }
289          v.property("hbase", true);
290          if (fullFill) {
291            v.property("fullfill", true);
292            for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> entry : resultMap.entrySet()) {
293              family = Bytes.toString(entry.getKey());
294              if (!family.equals("b")) {
295                for (Map.Entry<byte[], byte[]> e : entry.getValue().entrySet()) {
296                  field = Bytes.toString(e.getKey());
297                  column = family + ":" + field;
298                  if (schema != null) {
299                    value = schema.decode(column, e.getValue());
300                    }
301                  else {
302                    value = Bytes.toString(e.getValue());
303                    }
304                  v.property(field, value);
305                  }
306                }
307              }
308            }
309          else {
310            v.property("fullfill", false);
311            if (partialFill != null && !partialFill.trim().equals("")) { // TBD: optimize
312              for (String clm : partialFill.split(",")) {
313                cc = clm.trim().split(":");
314                b = resultMap.get(Bytes.toBytes(cc[0])).get(Bytes.toBytes(cc[1]));
315                v.property(cc[1], schema.decode(clm, b));
316                }
317              }
318            }
319          if (geopoint != null && !geopoint.trim().equals("")) { // TBD: optimize
320            cc = geopoint.split(",");
321            latclm = cc[1].split(":");
322            lonclm = cc[2].split(":");
323            lat = resultMap.get(Bytes.toBytes(latclm[0])).get(Bytes.toBytes(latclm[1]));
324            lon = resultMap.get(Bytes.toBytes(lonclm[0])).get(Bytes.toBytes(lonclm[1]));
325            v.property(cc[0], Geoshape.point(Double.valueOf(schema.decode(cc[1], lat)), Double.valueOf(schema.decode(cc[2], lon)) - 180));
326            }
327          }
328        if (timer(label + "s created", i - 1, 100, commitLimit)) {
329          rs.renewLease();
330          lastInsertedKey = key;
331          failedKey       = null;
332          }
333        }
334      }
335    catch (Exception e) {
336      log.fatal("Failed while inserting " + i + "th vertex,\tlast inserted vertex: " + lastInsertedKey + "\tfirst uncommited vertex: " + failedKey, e);
337      close();
338      hc.close();
339      return lastInsertedKey;
340      }
341    timer(label + "s created", i - 1, -1, -1);
342    commit();
343    close();
344    hc.close();
345    return "";
346    }
347    
348  /** Logging . */
349  private static Logger log = LogManager.getLogger(StructureCreator.class);
350
351  }