001package com.Lomikel.HBaser;
002
003import com.Lomikel.Utils.LomikelException;
004
005// HBase
006import org.apache.hadoop.hbase.client.Get;
007import org.apache.hadoop.hbase.client.Result;
008import org.apache.hadoop.hbase.client.Get;
009import org.apache.hadoop.hbase.client.Result;
010
011// SQL
012import java.sql.Connection;
013import java.sql.DriverManager;
014import java.sql.SQLException;
015import java.sql.ResultSet;
016import java.sql.ResultSetMetaData;
017import java.sql.Statement;
018
019// Java
020import java.util.Map;  
021import java.util.TreeMap;  
022
023// Log4J
024import org.apache.logging.log4j.Logger;
025import org.apache.logging.log4j.LogManager;
026
027/** <code>HBaseSQLClient</code> adds SQL-search possibility and SQL Phoenix upsert
028  * possibility to {@link HBaseClient}. 
029  * @opt attributes
030  * @opt operations
031  * @opt types
032  * @opt visibility
033  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
034// TBD: reuse Phoenixer
035public class HBaseSQLClient extends HBaseClient {
036  
037  /** Create and connect to HBase.
038    * @param zookeepers The comma-separated list of zookeper ids.
039    * @param clientPort The client port. 
040    * @throws LomikelException If anything goes wrong. */
041  public HBaseSQLClient(String zookeepers,
042                        String clientPort) throws LomikelException {
043    super(zookeepers, clientPort);
044    }
045        
046  /** Create and connect to HBase.
047    * @param zookeepers The comma-separated list of zookeper ids.
048    * @param clientPort The client port. 
049    * @throws LomikelException If anything goes wrong. */
050  public HBaseSQLClient(String zookeepers,
051                        int    clientPort) throws LomikelException {
052    super(zookeepers, clientPort);
053    }
054    
055  /** Create and connect to HBase.
056    * @param url The HBase url.
057    * @throws LomikelException If anything goes wrong. */
058  public HBaseSQLClient(String url) throws LomikelException {
059    super(url);
060    }
061         
062  /** Get row(s) using SQL query.
063    * The Phoenix SQL View should be already created using {@link #sqlTableCreationCommand}. 
064    * @param sql     The (Phoenix) SQL query.
065    *                The <code>ROWKEY</code> should be included in the <code>SELECT<code>
066    *                part (explicitely or implicitely).
067    *                The small and mixed case variables should be closed in quotation marks. 
068    * @param ifkey   Whether add also entries keys (as <tt>key:key</tt>).
069    * @return        The {@link Map} of {@link Map}s of results as <tt>key-&t;{family:column-&gt;value}</tt>. */
070  // TBD: handle binary columns
071  // TBD: handle iftime
072  // TBD: handle WHERE
073  public Map<String, Map<String, String>> scan(String    sql,
074                                               boolean   ifkey) {
075    Map<String, Map<String, String>> results = new TreeMap<>();
076    Map<String, String> result;
077    Statement stmt;
078    try {
079      stmt = conn().createStatement();
080      }
081    catch (SQLException e) {
082      log.error("Cannot create statement", e);
083      return null;
084      }
085    try {
086      ResultSet rs = stmt.executeQuery(sql);
087      ResultSetMetaData rsmd = rs.getMetaData();
088      int n = rsmd.getColumnCount();
089      while (rs.next()) {
090        result = new TreeMap<>();
091        if (addResult(rs, rsmd, result, ifkey)) {
092          results.put(rs.getString("rowkey"), result);
093          }
094        }
095      }
096    catch (SQLException e ) {
097      log.error("Cannot execute: " + sql, e);
098      } 
099    return results;
100    }
101    
102  /** Add {@link Result} into result {@link Map}.
103    * @param rs      The {@link ResultSet} to add (the current row).
104    * @param rsmd    The {@link ResultSetMetaData}.
105    * @param result  The {@link Map} of results <tt>familty:column-&gt;value</tt>.
106    * @param ifkey   Whether add also entries keys (as <tt>key:key</tt>).
107    * @return        Whether the result has been added. */
108  private boolean addResult(ResultSet           rs,
109                            ResultSetMetaData   rsmd,
110                            Map<String, String> result,
111                            boolean             ifkey) throws SQLException {
112    String columnName;
113    int n = rsmd.getColumnCount();
114    boolean isSchema = false;
115    if (rs.getString("ROWKEY").startsWith("schema")) {
116      isSchema = true;
117      }
118    for (int i = 1; i <= n; i++) {
119      columnName = rsmd.getColumnName(i);
120      if (columnName.equals("ROWKEY")) {
121        if (ifkey) {
122          result.put("key:key", rs.getString(i));
123          }
124        }
125      else if (!isSchema && _simpleSchema != null && _simpleSchema.type(columnName) != null) {
126        result.put(rsmd.getColumnName(i), _simpleSchema.decode(columnName, rs.getBytes(i)));
127        }
128      else {
129        result.put(columnName, rs.getString(i));
130        }
131      }
132    return true;
133    }
134    
135  /** Give SQL view creation command for this HBase table.
136    * It creates the Phoenix SQL view of the the current HBase table.
137    * @return The SQL view creation command for this HBase table. */
138  public String sqlViewCreationCommand() {
139    HBaseSchema hs = (HBaseSchema)schema();
140    return hs.toSQLView(tableName());
141    }
142   
143  /** Give SQL table creation command for this HBase table.
144    * It creates the SQL tabel with the same properties are the current HBase table.
145    * Using the default table name.
146    * @return The SQL table creation command for this HBase table. */
147  public String sqlTableCreationCommand() {
148    return schema().toSQL(tableName() + "_" + schema().name().replaceAll("\\.", "__"));
149    }
150
151  /** Replicate row(s) into Phoenix SQL table.
152    * @param key          The row key. Disables other search terms.
153    *                     It can be <tt>null</tt>.
154    * @param search       The search terms as <tt>family:column:value,...</tt>.
155    *                     Key can be searched with <tt>family:column = key:key<tt> "pseudo-name".
156    *                     <tt>key:startKey</tt> and <tt>key:stopKey</tt> van restrict search to a key interval.
157    *                     {@link Comparator} can be chosen as <tt>family:column:value:comparator</tt>
158    *                     among <tt>exact,prefix,substring,regex</tt>.
159    *                     The default for key is <tt>prefix</tt>,
160    *                     the default for columns is <tt>substring</tt>.
161    *                     The randomiser can be added with <tt>random:random:chance</tt>.
162    *                     It can be <tt>null</tt>.
163    *                     All searches are executed as prefix searches.    
164    * @param filter       The names of required values as <tt>family:column,...</tt>.
165    *                     <tt>*</tt> = all.
166    * @param start        The time period start timestamp in <tt>ms</tt>.
167    *                     <tt>0</tt> means since the beginning.
168    * @param stop         The time period stop timestamp in <tt>ms</tt>.
169    *                     <tt>0</tt> means till now.
170    * @param ifkey        Whether give also entries keys (as <tt>key:key</tt>).
171    * @param iftime       Whether give also entries timestamps (as <tt>key:time</tt>).
172    * @param sqlTableName The SQL table name. Empty or <tt>null</tt> will use the default name.
173    * @return             The {@link Map} of {@link Map}s of results as <tt>key-&t;{family:column-&gt;value}</tt>. */
174  public Map<String, Map<String, String>> scan2SQL(String    key,
175                                                   String    search,
176                                                   String    filter,
177                                                   long      start,
178                                                   long      stop,
179                                                   boolean   ifkey,
180                                                   boolean   iftime,
181                                                   String    sqlTableName) {
182    log.info("Upserting results into SQL Phoenix table " + _sqlTableName + ", not returning results");
183    _sqlTableName = sqlTableName;
184    Map<String, Map<String, String>> results = scan(key, search, filter, start, stop, ifkey, iftime);
185    _sqlTableName = null;
186    return results;
187    }
188        
189  /** Give JDBC {@link Connection} to Phoenix database.
190    * @return The JDBC {@link Connection} to Phoenix database. */
191  public Connection conn() {
192    if (_conn == null) {
193      log.info("Opening " + zookeepers() + " on port " + clientPort() + " via Phoenix");
194      try {
195        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
196        //Class.forName("org.apache.phoenix.queryserver.client.Driver");
197        }
198      catch (ClassNotFoundException e) {
199        log.error("Cannot find Phoenix JDBC driver", e);
200        return null;
201        }
202      try {
203        _conn = DriverManager.getConnection("jdbc:phoenix:" + zookeepers() + ":" + clientPort()); 
204        //_conn = DriverManager.getConnection("jdbc:phoenix:thin:url=http://" + zookeepers() + ":" + clientPort() + ";serializa‌​tion=PROTOBUF"); 
205        }
206      catch (SQLException e) {
207        log.error("Cannot open connection", e);
208        return null;
209        }
210      if (schema() instanceof HBaseSchema && schema().size() > 0) {
211        _simpleSchema        = (HBaseSchema)schema().simpleSchema();
212        }
213      setProcessor(new HBaseSQLClientProcessor(this));      
214      }
215    return _conn;
216    }
217    
218  @Override
219  public void close() {
220    log.debug("Closing");
221    try {
222      if (_conn != null) {
223        _conn.close();
224        }
225      }
226    catch (SQLException e) {
227      log.warn("Cannot close JDBC", e);
228      }
229    _conn = null;
230    super.close();
231    }
232    
233  /** Give simple {@link HBaseSchema}.
234    * @return The simple {@link HBaseSchema}. */
235  public HBaseSchema simpleSchema() {
236    return _simpleSchema;
237    }
238    
239  /** Give SQL table name.
240    * @return The SQL table name. */
241  public String sqlTableName() {
242    return _sqlTableName;
243    }
244    
245  private HBaseSchema _simpleSchema; 
246    
247  private Connection _conn = null;  
248  
249  private String _sqlTableName = null;
250  
251  /** Logging . */
252  private static Logger log = LogManager.getLogger(HBaseSQLClient.class);
253
254  }