001package com.astrolabsoftware.FinkBrowser.Januser; 002 003import com.Lomikel.Utils.Init; 004import com.Lomikel.Utils.LomikelException; 005import com.Lomikel.HBaser.HBaseClient; 006import com.Lomikel.HBaser.HBaseSchema; 007import com.Lomikel.Januser.JanusClient; 008import com.Lomikel.Januser.GremlinRecipies; 009 010// Tinker Pop 011import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 012import org.apache.tinkerpop.gremlin.structure.Vertex; 013 014// Janus Graph 015import org.janusgraph.core.JanusGraph; 016import org.janusgraph.core.attribute.Geoshape; 017 018// HBase 019import org.apache.hadoop.hbase.client.Connection; 020import org.apache.hadoop.hbase.util.Bytes; 021import org.apache.hadoop.hbase.client.Result; 022import org.apache.hadoop.hbase.util.Bytes; 023import org.apache.hadoop.hbase.util.Bytes; 024import org.apache.hadoop.hbase.client.Result; 025import org.apache.hadoop.hbase.client.ResultScanner; 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.hbase.Cell; 028 029// Java 030import java.util.Map; 031import java.util.NavigableMap; 032 033// Log4J 034import org.apache.logging.log4j.Logger; 035import org.apache.logging.log4j.LogManager; 036 037/** <code>StructureCreator</code> generates the network of higher level entities 038 * from the LSST {@link Alert}s. 039 * @opt attributes 040 * @opt operations 041 * @opt types 042 * @opt visibility 043 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 044public class StructureCreator extends JanusClient { 045 046 /** Create JanusGraph structures from the HBase database. 047 * @param args[0] The operation to perform: <tt>populate</tt>. 048 * @param args[1] The file with the complete JanusGraph properties. 049 * @param args[2] The HBase hostname. 050 * @param args[3] The HBase port. 051 * @param args[4] The HBase table to replicate in Graph. 052 * @param args[5] The HBase table schema name. 053 * @param args[6] The label of newly created Vertexes. 054 * @param args[7] The row key name. 055 * @param args[8] The key prefix to limit replication to. 056 * @param args[9] The key to start search from (in ms), may be blank. 057 * @param args[10] The key to stop search at (in ms), may be blank. 058 * @param args[11] The time start search from, may be 0. 059 * @param args[12] The time stop search at, may be 0. 060 * @param args[13] The maximal number of entries to process (-1 means all entries). 061 * @param args[14] The number of entries to skip (-1 or 0 means no skipping). 062 * @param args[15] The number of events to commit in one step (-1 means commit only at the end). 063 * @param args[16] Whether remove all {@link Vertex}es with the define 064 * label before populating or check for each one and only 065 * create it if it doesn't exist yet. 066 * @param args[17] Whether check the existence of the vertex before creating it. 067 * (Index-based verification is disabled for speed.) 068 * @param args[18] Whether fill all variables or just rowkey and lbl. 069 * Overrides partialFill. 070 * @param args[19] List of (coma separated) HBase columns to fill besides the default column. 071 * @param args[20] List of (coma separated) geopoint property name and HBase columns representing lat and long in deg. 072 * @throws LomikelException If anything goes wrong. */ 073 public static void main(String[] args) throws Exception { 074 Init.init(); 075 if (args[0].trim().equals("populate")) { 076 String failedKey = args[9]; 077 do { 078 failedKey = new StructureCreator(args[1]).populateGraphFromHBase( args[2], 079 Integer.valueOf(args[3]), 080 args[4], 081 args[5], 082 args[6], 083 args[7], 084 args[8], 085 failedKey, 086 args[10], 087 Long.valueOf( args[11]), 088 Long.valueOf( args[12]), 089 Integer.valueOf(args[13]), 090 Integer.valueOf(args[14]), 091 Integer.valueOf(args[15]), 092 args[16].equals("true"), 093 args[17].equals("true"), 094 args[18].equals("true"), 095 args[19], 096 args[20]); 097 } 098 while (!failedKey.equals("")); 099 } 100 else { 101 System.err.println("Unknown function " + args[0] + ", try or populate"); 102 System.exit(-1); 103 } 104 } 105 106 /** Create with connection parameters. 107 * @param hostname The HBase hostname. 108 * @param port The HBase port. 109 * @param table The HBase table. */ 110 public StructureCreator(String hostname, 111 int port, 112 String table) { 113 super(hostname, port, table, false); 114 } 115 116 /** Create with connection parameters. 117 * @param hostname The HBase hostname. 118 * @param port The HBase port. 119 * @param table The HBase table. 120 * @param batch Whether open graph for batch loading. */ 121 public StructureCreator(String hostname, 122 int port, 123 String table, 124 boolean batch) { 125 super(hostname, port, table, batch); 126 } 127 128 /** Create with connection properties file. 129 * @param properties The file with the complete properties. */ 130 public StructureCreator(String properties) { 131 super(properties); 132 } 133 134 /** Populate JanusGraph from HBase table. 135 * @param hbaseHost The HBase hostname. 136 * @param hbasePort The HBase port. 137 * @param hbaseTable The HBase table to replicate in Graph. 138 * @param tableSchema The HBase table schema name. 139 * @param label The label of newly created Vertexes. 140 * @param rowkey The row key name. 141 * @param keyPrefixSearch The key prefix to limit replication to. 142 * @param keyStart The key to start search from, may be blank. 143 * @param keyStop The key to stop search at, may be blank. 144 * @param start The time start search from (in ms), may be 0. 145 * @param stop The time stop search at (in ms), may be 0. 146 * @param limit The maximal number of entries to process (-1 means all entries). 147 * @param skip The number of entries to skip (-1 or 0 means no skipping). 148 * @param commitLimit The number of events to commit in one step (-1 means commit only at the end). 149 * @param reset Whether remove all {@link Vertex}es with the define 150 * label before populating or check for each one and only 151 * create it if it doesn't exist yet. 152 * @param getOrCreate Whether check the existence of the vertex before creating it. 153 * (Index-based verification is disabled for speed.) 154 * @param fullFill Whether fill all variables or just rowkey and lbl. 155 * Overrides partialFill. 156 * @param partialFill List of (coma separated) HBase columns to fill besides the default column. 157 * @param geopoint List of (coma separated) geopoint property name and HBase columns representing lat and long in deg. 158 * @return Blank if the population has been executed correctly, the last 159 * sucessfull key otherwise. 160 * @throws LomikelException If anything goes wrong. */ 161 // TBD: allow replacing, updating 162 // TBD: read only rowkey if fullFill = false 163 // TBD: handle binary columns 164 public String populateGraphFromHBase(String hbaseHost, 165 int hbasePort, 166 String hbaseTable, 167 String tableSchema, 168 String label, 169 String rowkey, 170 String keyPrefixSearch, 171 String keyStart, 172 String keyStop, 173 long start, 174 long stop, 175 int limit, 176 int skip, 177 int commitLimit, 178 boolean reset, 179 boolean getOrCreate, 180 boolean fullFill, 181 String partialFill, 182 String geopoint) throws LomikelException { 183 log.info("Populating Graph from " + hbaseTable + "(" + tableSchema + ")@" + hbaseHost + ":" + hbasePort); 184 log.info("\tvertex labels: " + label); 185 log.info("\t" + rowkey + " starting with " + keyPrefixSearch); 186 log.info("\tlimit/skip/commitLimit: " + limit + "/" + skip + "/" + commitLimit); 187 if (reset) { 188 log.info("\tcleaning before population"); 189 } 190 if (getOrCreate) { 191 log.info("\tadd vertex only if non-existent"); 192 } 193 else { 194 log.info("\tadd vertex even if it already exists"); 195 } 196 if (fullFill) { 197 log.info("\tfilling all variables"); 198 } 199 else if (partialFill != null && !partialFill.trim().equals("")) { 200 log.info("\tfilling " + rowkey + ",lbl," + partialFill.trim()); 201 } 202 else { 203 log.info("\tfilling only " + rowkey + " and lbl"); 204 } 205 if (geopoint != null && !geopoint.trim().equals("")) { 206 log.info("\tfilling also geopoint " + geopoint.trim()); 207 } 208 if (!keyStart.equals("")) { 209 log.info("Staring at " + keyStart); 210 } 211 if (!keyStop.equals("")) { 212 log.info("Stopping at " + keyStop); 213 } 214 if (start > 0) { 215 log.info("Staring at " + start + "ms"); 216 } 217 if (stop > 0) { 218 log.info("Stopping at " + stop + "ms"); 219 } 220 GremlinRecipies gr = new GremlinRecipies(this); 221 timerStart(); 222 if (reset) { 223 log.info("Cleaning Graph, vertexes: " + label); 224 g().V().has("lbl", label).drop().iterate(); 225 } 226 commit(); 227 log.info("Connection to HBase table"); 228 HBaseClient hc = new HBaseClient(hbaseHost, hbasePort); 229 hc.connect(hbaseTable, tableSchema); 230 hc.setLimit(0); 231 String searchS = "key:key:" + keyPrefixSearch + ":prefix"; 232 if (!keyStart.equals("")) { 233 searchS += ",key:startKey:" + keyStart; 234 } 235 if (!keyStop.equals("")) { 236 searchS += ",key:stopKey:" + keyStop; 237 } 238 hc.scan(null, searchS, "*", start, stop, false, false); 239 ResultScanner rs = hc.resultScanner(); 240 HBaseSchema schema = hc.schema(); 241 log.info("Populating Graph"); 242 Vertex v; 243 String key; 244 String lastInsertedKey = null; 245 String failedKey = null; 246 String family; 247 String field; 248 String column; 249 String value; 250 byte[] b; 251 String[] cc; 252 byte[] lat; 253 byte[] lon; 254 String[] latclm; 255 String[] lonclm; 256 int i = 0; 257 //for (Result r : rs) { 258 // i++; 259 // key = Bytes.toString(r.getRow()); 260 // for (Cell cell : r.listCells()) { 261 // family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); 262 // column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); 263 // value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 264 // } 265 // timer(label + "s created", i, 100, commitLimit); 266 // } 267 NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap; 268 try { 269 for (Result r : rs) { 270 resultMap = r.getNoVersionMap(); 271 key = Bytes.toString(r.getRow()); 272 if (!key.startsWith("schema")) { 273 if (failedKey == null) { 274 failedKey = key; 275 } 276 i++; 277 if (i <= skip) { 278 continue; 279 } 280 if (limit > 0 && i > limit) { 281 break; 282 } 283 if (getOrCreate) { 284 v = gr.getOrCreate(label, rowkey, key).next(); 285 } 286 else { 287 v = g().addV(label).property(rowkey, key).property("lbl", label).next(); 288 } 289 v.property("hbase", true); 290 if (fullFill) { 291 v.property("fullfill", true); 292 for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> entry : resultMap.entrySet()) { 293 family = Bytes.toString(entry.getKey()); 294 if (!family.equals("b")) { 295 for (Map.Entry<byte[], byte[]> e : entry.getValue().entrySet()) { 296 field = Bytes.toString(e.getKey()); 297 column = family + ":" + field; 298 if (schema != null) { 299 value = schema.decode(column, e.getValue()); 300 } 301 else { 302 value = Bytes.toString(e.getValue()); 303 } 304 v.property(field, value); 305 } 306 } 307 } 308 } 309 else { 310 v.property("fullfill", false); 311 if (partialFill != null && !partialFill.trim().equals("")) { // TBD: optimize 312 for (String clm : partialFill.split(",")) { 313 cc = clm.trim().split(":"); 314 b = resultMap.get(Bytes.toBytes(cc[0])).get(Bytes.toBytes(cc[1])); 315 v.property(cc[1], schema.decode(clm, b)); 316 } 317 } 318 } 319 if (geopoint != null && !geopoint.trim().equals("")) { // TBD: optimize 320 cc = geopoint.split(","); 321 latclm = cc[1].split(":"); 322 lonclm = cc[2].split(":"); 323 lat = resultMap.get(Bytes.toBytes(latclm[0])).get(Bytes.toBytes(latclm[1])); 324 lon = resultMap.get(Bytes.toBytes(lonclm[0])).get(Bytes.toBytes(lonclm[1])); 325 v.property(cc[0], Geoshape.point(Double.valueOf(schema.decode(cc[1], lat)), Double.valueOf(schema.decode(cc[2], lon)) - 180)); 326 } 327 } 328 if (timer(label + "s created", i - 1, 100, commitLimit)) { 329 rs.renewLease(); 330 lastInsertedKey = key; 331 failedKey = null; 332 } 333 } 334 } 335 catch (Exception e) { 336 log.fatal("Failed while inserting " + i + "th vertex,\tlast inserted vertex: " + lastInsertedKey + "\tfirst uncommited vertex: " + failedKey, e); 337 close(); 338 hc.close(); 339 return lastInsertedKey; 340 } 341 timer(label + "s created", i - 1, -1, -1); 342 commit(); 343 close(); 344 hc.close(); 345 return ""; 346 } 347 348 /** Logging . */ 349 private static Logger log = LogManager.getLogger(StructureCreator.class); 350 351 }