root/trunk/classes/readers/dbreader.php

Revision trunk,185, 17.1 kB (checked in by Suren A. Chilingaryan <csa@dside.dyndns.org>, 7 months ago)

Support for the masks table in DBReader

Line 
1 <?php
2
3 class DBLogGroup extends LOGGROUP {
4  var $table;
5  
6  function __construct(array &$info, DBReader &$rdr, $flags = 0) {
7     parent::__construct($info);
8
9     if (is_array($rdr->groups)) {
10     foreach($rdr->groups as $re => &$table) {
11         if (preg_match($re, $this->gid)) {
12         $this->table = preg_replace($re, $table, $this->gid);
13         return;
14         }
15     }
16     }   
17     $this->table = &$this->gid;
18  }
19 }
20
21 class DBAxes extends GRAPHAxes {
22  function __construct(REQUEST $props = NULL, DATABASE $db, &$info, $flags = 0) {
23     parent::__construct($props, $flags);
24     
25     if (!$info['table'])
26     throw new ADEIException("The axes table is not specified in the reader configuration");
27
28     if (!$info['id'])
29     throw new ADEIException("The id column for axes table is not specified in the reader configuration");
30
31     $query = "";
32     if (is_array($info['properties'])) {
33     foreach ($info['properties'] as $prop => $col) {
34         $query.= ", $col AS ${prop}";
35     }
36     }
37
38     $axes = $db->Query("SELECT {$info['id']} AS axis_id{$query} FROM {$this->col_quote}{$info['table']}$this->col_quote");
39     foreach ($axes as $axis) {
40     $this->axis_info[$axis['axis_id']] = $axis;
41     }
42  }
43 }
44
45
46 class DBReader extends READER {
47  var $db;
48  
49  var $groups;
50  var $tables;
51  var $columns;
52  
53  var $data_request;    // string request with no MASK (pre group)
54  var $data_columns;    // array of data columns for each group
55  var $time_column;    // time column request for each group
56  var $item_info;    // item informations
57  var $axes;        // AXES object
58
59  var $time_sort;
60  var $inverse_sort;
61  
62  var $monitor_timings;
63  var $emit_delays;
64
65  function __construct(&$props) {
66     $this->no_default_time_module = true;
67
68     parent::__construct($props);
69
70     $this->db = new DATABASE($this->server);
71     
72     $this->groups = $this->opts->Get('groups');
73     $this->group_class = "DBLogGroup";
74     
75     $this->tables = $this->opts->Get('tables');
76     $this->columns = $this->opts->Get('columns');
77     $this->data_request = false;
78     $this->data_columns = false;
79     $this->time_column = false;
80     $this->axes = false;
81     $this->item_info = false;
82
83     if (!$this->time_module) {
84     $this->time_format = $this->db->GetTimeFormat();
85     $this->time_module = new READERTime($this, array(
86         'format' => $this->time_format,
87         'timezone' => $this->time_zone
88     ));
89     }
90
91     $time_sort = $this->opts->Get("timesort", 1);
92     if ($time_sort) {
93     if (is_string($time_sort)) {
94         $this->time_sort = $time_sort;
95         $this->inverse_sort = $this->opts->Get("inverse_timesort");
96     } else if ($time_sort>0) {
97         $this->time_sort = $this->db->col_quote . $this->columns['time'] . $this->db->col_quote . " ASC";
98         $this->inverse_sort = $this->db->col_quote . $this->columns['time'] . $this->db->col_quote . " DESC";
99     } else {
100         $this->time_sort = $this->db->col_quote . $this->columns['time'] . $this->db->col_quote . " DESC";
101         $this->inverse_sort = $this->db->col_quote . $this->columns['time'] . $this->db->col_quote . " ASC";
102     }
103     } else {
104     $this->time_sort = false;
105     $this->inverse_sort = false;
106     }
107
108     /* per db only, no per group handling here */   
109     $this->emit_delays = $this->opts->Get("emit_delays");
110     $this->monitor_timings = $this->opts->Get("monitor_timings");
111  }
112
113
114  function GetFilteredMaskList(LOGGROUP $grp = NULL, $maskid = false, $flags = 0) {
115     $grp = $this->CheckGroup($grp, $flags);
116
117     $info = $this->req->GetGroupOption("mask_table", $grp);
118 //    $info = $this->opts->Get("mask_table");
119     if (!$info) return parent::GetMaskList($grp, $flags);
120     
121     if (!is_array($info))
122     throw new ADEIException(translate("Invalid mask table specified in the reader configuration, the array with information should be provided"));
123
124     if (!$info['table'])
125     throw new ADEIException(translate("The mask table is not specified in the reader configuration"));
126
127     if (!$info['id'])
128     throw new ADEIException(translate("The id column for mask table is not specified in the reader configuration"));
129
130
131     $query = "";
132     if (is_array($info['properties'])) {
133     foreach ($info['properties'] as $prop => $col) {
134         $query.= ", {$this->db->col_quote}$col{$this->db->col_quote} AS ${prop}";
135     }
136     }
137     
138     if ($maskid) {
139     $cond = " WHERE {$this->db->col_quote}{$info['id']}{$this->db->col_quote} = {$this->db->text_quote}{$maskid}{$this->db->text_quote}";
140     } elseif ($info['gid']) {
141     $table = $this->db->FixTableName($grp->table);
142     $cond = " WHERE {$this->db->col_quote}{$info['gid']}{$this->db->col_quote} = {$this->db->text_quote}{$table}{$this->db->text_quote}";
143     } else {
144     $cond = "";
145     }
146     $query = "SELECT {$this->db->col_quote}{$info['id']}{$this->db->col_quote} AS id{$query} FROM {$this->db->tbl_quote}{$info['table']}{$this->db->tbl_quote}" . $cond;
147
148     $masks = $this->db->Query($query);
149
150     if ($masks->rowCount()) {
151     $items = $this->GetItemList($grp, NULL, $flags);
152     $hash = array();
153     foreach ($items as $item) {
154         $hash[$item['name']] = $item;
155     }
156     } else if ($maskid) {
157     throw new ADEIException(translate("Mask \"%s\" is not found for group \"%s\"", $maskid, $grp->gid));
158     }
159     
160     $list = array();
161     foreach ($masks as $mask) {
162     $items = explode(",", $mask['mask']);
163     foreach ($items as &$item) {
164         if (!is_numeric($item)) {
165         if (isset($hash[$item])) {
166             $item = $hash[$item]['id'];
167         } else {
168             throw new ADEIException(translate("Item \"%s\" specified in the mask \"%s\" is not available in the group \"%s\"", $item, $mask['id'], $grp->gid));
169         }
170         }
171     }
172
173     $mask['mask'] = implode(",", $items);
174     $mask['id'] =     "maskid" . $mask['id'];
175
176     $list[$mask['id']] = $mask;
177     }
178
179     return array_merge(
180     parent::GetMaskList($grp, $flags),
181     $list
182     );
183  }
184
185  function GetMaskList(LOGGROUP $grp = NULL, $flags = 0) {
186     return $this->GetFilteredMaskList($grp, false, $flags);
187  }
188
189  function CreateMask(LOGGROUP $grp = NULL, array &$minfo = NULL, $flags = 0) {
190     if ($minfo === NULL) {
191     if ($this->req instanceof ITEMGroupRequest)
192         $minfo = $this->req->GetMaskInfo($flags);
193     }
194
195     if (preg_match("/^maskid(\d+)$/", $minfo["db_mask"], $m)) {
196     $res = new MASK(array(), $this, $grp, $flags);
197     
198     $masks = $this->GetFilteredMaskList($grp, $m[1], $flags);
199     $mask = $masks->current();
200
201     if ($mask) {
202         $res->SetIDs($mask['mask'], $mask['id']);
203     }
204     return $res;
205     } else {
206     return new MASK($minfo, $this, $grp, $flags);
207     }
208  }
209
210
211  function CreateAxes($flags = 0) {
212     if ($this->axes) return $this->axes;
213     
214     $axes_table = $this->opts->Get("axes_table");
215     if ($axes_table) {
216     $this->axes = new DBAxes($this->req, $this->db, $axes_table, $flags);
217     return $this->axes;
218     } else {
219     return parent::CreateAxes($flags);
220     }
221  }
222
223  function GetGroupInfo(LOGGROUP $grp = NULL, $flags = 0) {
224     $groups = array();
225
226     $res = $this->db->ShowTables();
227     foreach ($res as $row) {
228     $gid = $row[0];
229     
230     if (is_array($this->tables)) {
231         $found = false;
232         foreach ($this->tables as $re => &$info) {
233
234         if (preg_match($re, $gid)) {
235             if (is_array($info)) {
236             if (isset($info['title'])) {
237                 $name = preg_replace($re, $info['title'], $gid);
238                 $gid = preg_replace($re, $info['gid'], $gid);
239             } else {
240                 $gid = preg_replace($re, $info['gid'], $gid);
241                 $name = $gid;
242             }
243             } else {
244             $gid = preg_replace($re, $info, $gid);
245             $name = $gid;
246             }
247             $found = true;
248                 break;
249         }
250         }
251         if (!$found) continue;
252         if (($grp)&&($grp->gid != $gid)) continue;
253     } else if ($tables) {
254         if (($grp)&&($grp->gid != $gid)) continue;
255         if (!preg_match($this->tables, $gid)) continue;
256         $name = $gid;
257     } else {
258         throw new ADEIException(translate("DBReader has no tables are configured in"));
259     }
260
261     $groups[$gid] = $row;
262     $groups[$gid]['gid'] = $gid;
263     $groups[$gid]['name'] = $name;
264
265     if ($flags&REQUEST::NEED_INFO) {
266         if ($grp) {
267         $grzeus = $grp;
268         } else {
269         $ginfo = array("db_group" => $gid);
270         $grzeus = $this->CreateGroup($ginfo);
271         }
272         
273         $tc = $this->columns['time'];
274         $req = "MIN($tc), MAX($tc)";
275         if ($flags&REQUEST::NEED_COUNT)
276         $req .= ", COUNT($tc)";
277
278         if ($this->monitor_timings) {
279         $tt = $this->req->GetGroupOption("monitor_timings", $grp);
280         if (is_array($tt)) {
281             if (isset($tt['query_limit'])) $tt_limit = $tt['query_limit'];
282             else $tt_limit = 1000000;
283             $tt_exception = $tt['raise_exception'];
284         } else {
285             $tt_limit = 1000000;
286             $tt_exception = false;
287         }
288         $tt_limit += 1000000*microtime(true);
289 /*
290             $tod = gettimeofday();
291         $tod['usec'] += $tt_limit%1000000;
292         if ($tod['usec'] > 999999) {
293             $tod['usec'] -= 1000000;
294             $tod['sec']++;
295         }
296         $tod['sec'] += $tt_limit / 1000000;
297 */
298         }
299         
300         if ($this->emit_delays) {
301         log_message("Sleeping");
302         usleep($this->emit_delays);
303         }
304         
305         $valres = $this->db->Query("SELECT $req FROM " . $this->db->tbl_quote . $grzeus->table . $this->db->tbl_quote, DATABASE::FETCH_NUM);
306         $vals = $valres->fetch(PDO::FETCH_NUM);
307         $valres = NULL;
308         
309         if ($this->monitor_timings) {
310         if (1000000 * microtime(true) > $tt_limit) {
311             $msg = translate("The query on group '%s' is exceeded allowed execution time (exceeding %d msec). This normally indicates inappropriate indexing of the source database. You can overcome the problem by setting '%s' and '%s' options", $grp->gid, ceil(1000*microtime(true) - $tt_limit/1000), "use_cache_reader", "fill_raw_first");
312             if ($tt_exception)
313             throw new ADEIException($msg);
314             else
315             log_message($this->req->GetLocationString() . ": " . $msg);
316         }
317         }
318
319         if (($vals)&&(($vals[0])||($vals[1]))) {
320             $opts = $this->req->GetGroupOptions($grzeus);
321         $limit = $opts->GetDateLimit();
322         
323         $groups[$gid]['first'] = $this->ExportUnixTime($vals[0]);
324             if ((is_int($limit[0]))&&($limit[0] > $groups[$gid]['first'])) {
325                 $groups[$gid]['first'] = $limit[0];
326         } else if ($groups[$gid]['first'] < 0) {
327             $groups[$gid]['first'] = 0;
328         }
329         
330         $groups[$gid]['last'] = $this->ExportUnixTime($vals[1]);
331         if ((is_int($limit[1]))&&($limit[1] < $groups[$gid]['last'])) {
332                 $groups[$gid]['last'] = $limit[1];
333         } else if ($groups[$gid]['last'] < 0) {
334                 $groups[$gid]['last'] = 0;
335         }
336
337         if ($flags&REQUEST::NEED_COUNT)
338             $groups[$gid]['records'] = $vals[2];
339         } else {
340         unset($groups[$gid]['first']);
341         unset($groups[$gid]['last']);
342         }
343
344         if ($flags&REQUEST::NEED_ITEMINFO) {
345         $groups[$gid]['items'] = $this->GetItemList($grzeus);
346         }
347     }
348     }
349
350     return $grp?$groups[$grp->gid]:$groups;
351  }
352
353  function FindItemInfo() {
354     if ($this->item_info) return;
355     
356     $info = $this->opts->Get("item_table");
357     if (!$info) return;
358     
359     if (!$info['table'])
360     throw new ADEIException("The item table is not specified in the reader configuration");
361
362     if (!$info['id'])
363     throw new ADEIException("The id column for item table is not specified in the reader configuration");
364
365     if ($info['gid']) $query = ", {$info['gid']} AS gid";
366     else $query = "";
367     if (is_array($info['properties'])) {
368     foreach ($info['properties'] as $prop => $col) {
369         $query.= ", $col AS ${prop}";
370     }
371     }
372
373     $items = $this->db->Query("SELECT {$info['id']} AS id{$query} FROM {$this->col_quote}{$info['table']}$this->col_quote");
374     if (!$items) return;
375     
376     $this->item_info = array();
377     if ($info['gid']) {
378     $this->item_info['__group_mode__'] = true;
379     foreach ($items as $item) {
380         if (!is_array($this->item_info[$info['gid']])) {
381         $this->item_info[$info['gid']] = array();
382         }
383             $this->item_info[$info['gid']][$item['id']] = $item;
384     }
385     } else {
386     $this->item_info['__group_mode__'] = false;
387     foreach ($items as $item) {
388             $this->item_info[$item['id']] = $item;
389     }
390     }
391  }
392  
393  function GetItemList(LOGGROUP $grp = NULL, MASK $mask = NULL, $flags = 0) {
394     $this->FindItemInfo();
395     
396     if ($flags&REQUEST::ONLY_AXISINFO) {
397     if ((!$this->item_info)&&(!$this->req->GetGroupOptions($grp, "axis"))) return array();
398     }
399
400     $grp = $this->CheckGroup($grp, $flags);
401     if (!$mask) $mask = $this->CreateMask($grp, $info = NULL, $flags);
402
403     $uid = $this->opts->Get('channel_uids', false);
404
405     $items = array();
406
407     $resp = $this->db->ShowColumns($grp->table);
408
409     $pos = 0; $rpos = 0;
410     foreach ($resp as $row) {
411     $name = $row[0];
412     if (!preg_match($this->columns['data'], $name)) continue;
413
414     if (!$mask->Check($pos++)) continue;
415     
416     $items[$rpos] = array(
417         "id" => $pos - 1,
418         "name" =>  $name,
419         "column" => $name
420     );
421
422     if ($this->item_info) {
423         if ($this->item_info['__group_mode__']) {
424         $info = $this->item_info[$grp->gid][$name];
425         } else {
426         $info = $this->item_info[$name];
427         }
428         if (is_array($info)) {
429             unset($info['id']);
430         unset($info['column']);
431         
432         $items[$rpos] = array_merge($items[$rpos], $info);
433         }
434     }
435     
436     if (($uid)&&(!isset($items[$rpos]["uid"]))) {
437         if (($uid === true)||(preg_match($uid, $name))) {
438         $items[$rpos]["uid"] = $name;
439         }
440     }
441
442
443     $rpos++;
444     }
445     
446     if (!$name)
447     throw new ADEIException(translate("DBReader can't find any column in table (%s)", $grp->table));
448
449     if (!$pos)
450     throw new ADEIException(translate("DBReader is not able to find any column matching filter (%s) in table (%s)", $this->columns['data'], $grp->table));
451
452     if ($flags&REQUEST::NEED_AXISINFO) {
453     $this->AddAxisInfo($grp, $items);
454     }
455     
456     return $items;
457  }
458
459  function FindColumns(LOGGROUP $grp, MASK $msk = NULL) {
460     if ((!$this->time_column)||(!$this->time_column[$grp->gid])) {
461     if ($this->time_module)
462         $time_column = $this->columns['time'];
463     else
464         $time_column = $this->db->GetTimeRequest($this->columns['time']);
465         
466
467     $data_columns = array();
468
469     $items = $this->GetItemList($grp);
470     foreach ($items as $item) {
471         array_push($data_columns, $item['column']);
472     }
473
474     $this->data_columns[$grp->gid] = $data_columns;
475     $this->time_column[$grp->gid] = $time_column;
476     }
477     
478
479     if (($mask)&&(is_array($ids = $mask->GetIDs()))) {
480     if (!isset($time_column)) {
481         $data_columns = &$this->data_columns[$grp->gid];
482         $time_column = &$this->time_column[$grp->gid];
483     }
484     $data_request = "";
485     foreach ($ids as $id) {
486         if ($id > sizeof($data_columns))
487             throw new ADEIException(translate("Invalid item mask is supplied. The ID:%d refers non-existing item.", $id));
488         
489         $data_request .=  "{$this->db->col_quote}{$data_columns[$id]}{$this->db->col_quote}, ";
490     }
491
492     return "$data_request $time_column";
493     } elseif ($this->data_request[$grp->gid]) {
494     return $this->data_request[$grp->gid];
495     } else {
496     $this->data_request[$grp->gid] =
497         $this->db->col_quote . implode("{$this->db->col_quote}, {$this->db->col_quote}", $this->data_columns[$grp->gid]) . "{$this->db->col_quote}, {$this->db->col_quote}" . $this->time_column[$grp->gid] . $this->db->col_quote;
498
499     return $this->data_request[$grp->gid];
500     }
501  }   
502
503  function GetRawData(LOGGROUP $grp = NULL, $from = 0, $to = 0, DATAFilter $filter = NULL, &$filter_data = NULL) {
504     $grp = $this->CheckGroup($grp);
505
506     if ((!$from)||(!$to)) {
507         $ivl = $this->CreateInterval($grp);
508     $ivl->Limit($from, $to);
509     
510     $from = $ivl->GetWindowStart();
511     $to = $ivl->GetWindowEnd();
512     }
513
514
515     if ($filter) {
516     $mask = $filter->GetItemMask();
517     $resample = $filter->GetSamplingRate();
518     $limit = $filter->GetVectorsLimit();
519
520     if (isset($filter_data)) {
521         if ($mask) $filter_data['masked'] = true;
522         if ($resample) $filter_data['resampled'] = true;
523         if ($limit) $filter_data['limited'] = true;
524     }
525     } else {
526     $mask = NULL;
527     $resample = 0;
528     $limit = 0;
529     }
530
531     $data_columns = $this->FindColumns($grp, $msk);
532     
533     if ($this->emit_delays) {
534     log_message("Sleeping");
535     usleep($this->emit_delays);
536     }
537
538
539     $selopts = array(
540     "condition" => $this->db->col_quote . $this->columns['time'] . "{$this->db->col_quote} BETWEEN " . $this->ImportUnixTime($from) . " and " . $this->ImportUnixTime($to)
541     );
542     
543     if ($resample) {
544     $selopts['sampling'] = array(
545         "slicer" => $this->time_module->GetTimeSlicingFunction("{$this->db->col_quote}{$this->columns['time']}{$this->db->col_quote}", $resample/*, $from*/),
546         "selector" => "{$this->db->col_quote}{$this->columns['time']}{$this->db->col_quote}"
547     );
548     }
549     
550     if ($limit) {
551     $selopts['limit'] = abs($limit);
552     if ($limit > 0) {
553         $selopts['order'] = $this->time_sort;
554     } else {
555         if (!$this->inverse_sort)
556         throw new ADEIException(translate("The inverse_timesort should be specified in order to select items from the end"));
557         
558         $selopts['order'] = $this->inverse_sort;
559         if ($limit < -1)
560         throw new ADEIException(translate("Current version supports only selecting a single item from the end"));
561     }
562     } else {
563     $selopts['order'] = $this->time_sort;
564     }
565     
566     $query = $this->db->SelectRequest($grp->table, $this->data_request[$grp->gid], $selopts);
567 #    echo $query . "\n";
568
569     $stmt = $this->db->Prepare($query);
570     return new DATABASEData($this, $stmt);
571  }
572
573  function HaveData(LOGGROUP $grp = NULL, $from = 0, $to = 0) {
574     $grp = $this->CheckGroup($grp);
575     if (!$to) $to = time();
576
577     if ($this->emit_delays) {
578     log_message("Sleeping");
579     usleep($this->emit_delays);
580     }
581     $res = $this->db->Query($this->db->SelectRequest($grp->table, array($this->columns['time']), array(
582     "condition" => "{$this->db->col_quote}" . $this->columns['time'] . "{$this->db->col_quote} BETWEEN " . $this->ImportUnixTime($from) . " and " . $this->ImportUnixTime($to),
583     "limit" => 1)), DATABASE::FETCH_NUM);
584     if ($res->fetch(PDO::FETCH_NUM)) return true;
585     return false;
586  }
587  
588 }
589
590 ?>
Note: See TracBrowser for help on using the browser.