Introduction

This is a study of time series data provided by Asset Mapping.

Data

The data source is a compressed SQL output file in the data directory. The format of the data is illustrated by this example:

      object_name       | object_name |         time_stamp         | value 
------------------------+-------------+----------------------------+-------
 London 70bf78cf (None) | NoiseAvg    | 2017-02-02 15:16:04        |  50.9
 London 9c850e0a (None) | NoisePeak   | 2017-03-22 03:46:53        |  36.4

The data starts at line 3. Observation fields are delimted by a |. The first column is IC_meter. The second column is value_type, which we analyse below. The time_stamp and value columns are what they are.

TODO: Query data from source with SQL.

Importing Data

#Import all source files in the data directory and store them in a matrix, where the row name is the file name
data_files <- list.files(path = "data", pattern = "\\.sql\\.gz$")
#The columns are the source data set, and the unique lists of meters and value_types in that data set
sensors_df_mtrx <- matrix(list(), nrow = length(data_files), ncol = 3)
rownames(sensors_df_mtrx) <- data_files
#Set column names explicitly as the column headings in the source are not what we need
column_names <- c("IC_meter", "value_type", "time_stamp", "value")
row <- 1
for (file in data_files) {
  #Import data, skipping header rows and without factors for now, so that the data types can be more easily converted
  sensors_df_mtrx[[row, 1]] <- read.delim(paste("data/", file, sep = ""), sep="|", skip = 2, col.names = column_names, header = FALSE, stringsAsFactors = FALSE, strip.white = TRUE) 
  #Delete last row if the values are null.
  nrow_sensor_df <- nrow(sensors_df_mtrx[[row, 1]])
  if (sensors_df_mtrx[[row, 1]][nrow_sensor_df, 2] == "") {
    sensors_df_mtrx[[row, 1]] <- sensors_df_mtrx[[row, 1]][-nrow_sensor_df,]
  }
  #Convert time_stamp string to POSIXct, note that the milliseconds are retained with the %OS format. To see milliseconds when printing, use:
  #op <- options(digits.secs=6) 
  sensors_df_mtrx[[row, 1]]$time_stamp <- as.POSIXct(sensors_df_mtrx[[row, 1]]$time_stamp, tz = "UTC", format = "%Y-%m-%d %H:%M:%OS", usetz = TRUE)
  row <- row + 1
}

Subset Data

#Process sensors_df_mtrx to subset each source data frame into dataframes for each meter, 
#and then replace the source data with the matrix of dataframe subsets
for (row in 1:nrow(sensors_df_mtrx)) {
  sensor_df <- sensors_df_mtrx[[row, 1]]
  ic_meters <- unique(sensor_df$IC_meter)
  value_types <- unique(sensor_df$value_type)
  #The matrix will hold a dataframe subset for each IC_meter and value_type, with the name of each row being the IC_meter, and the column the value_type
  meter_value_type_subsets_mtrx <- matrix(list(), nrow = length(ic_meters), ncol = length(value_types))
  rownames(meter_value_type_subsets_mtrx) <- ic_meters
  colnames(meter_value_type_subsets_mtrx) <- value_types
  col_counter <- 1
  row_counter <- 1
  for (icm in ic_meters) {
    for (vt in value_types) {
      meter_value_type_subsets_mtrx[[row_counter, col_counter]] <- subset(sensor_df, (IC_meter == icm & value_type == vt), select = -c(IC_meter, value_type))
      col_counter <- col_counter + 1
    }
    col_counter <- 1
    row_counter <- row_counter + 1
  }
  #Replace the original dataframe with the matrix of dataframe subsets
  sensors_df_mtrx[[row, 1]] <- meter_value_type_subsets_mtrx
  #Record the unique meter names and value_types
  sensors_df_mtrx[[row, 2]] <- ic_meters
  sensors_df_mtrx[[row, 3]] <- value_types
}

Data Exploration

line_plot_gatherer <- htmltools::tagList()
plot_counter <- 0
for (row in 1:nrow(sensors_df_mtrx)) {
  #Add the name of the source file
  line_plot_gatherer[[(plot_counter <- plot_counter + 1)]] <- htmltools::h1(paste("Source file -", rownames(sensors_df_mtrx)[row])) 
  meter_value_type_subsets_mtrx <- sensors_df_mtrx[[row, 1]]
  ic_meters <- sensors_df_mtrx[[row, 2]]
  value_types <- sensors_df_mtrx[[row, 3]]
  col_counter <- 1
  row_counter <- 1
  for (icm in ic_meters) {
    for (vt in value_types) {
      plot_df <- meter_value_type_subsets_mtrx[[row_counter, col_counter]]
      if (nrow(plot_df) > 0) {
        line_plot_gatherer[[(plot_counter <- plot_counter + 1)]] <- plot_ly(plot_df, x = ~time_stamp, y = ~value, type = "scatter", mode = "lines") %>% 
          layout(title = paste("Meter -", icm, "- Value Type -", vt))
        #Put some space between the plots
        line_plot_gatherer[[(plot_counter <- plot_counter + 1)]] <- htmltools::h1("") 
      }
      col_counter <- col_counter + 1
    }
    col_counter <- 1
    row_counter <- row_counter + 1
  }
}
line_plot_gatherer

Source file - 2_sensors_Dec_11_2016-Mar_26_2017.sql.gz

Source file - 2_sensors_sensor_type_1.sql.gz

Source file - 8_sensors_sensor_type_2.sql.gz

OpenTSDB Format

This section writes data for import into OpenTSDB, which could then be used with Grafana, for example.

The data format, from here is:

<metric> <timestamp> <value> <tagk=tagv> [<tagkN=tagvN>]

The data has to be made available on a file system that the OpenTSDB import command can see, and can then be imported like this:

for f in `ls *.tsdb`; do echo $f; sudo /usr/share/opentsdb/bin/tsdb import $f  --auto-metric=true; done
invisible(file.remove(file.path("tsdb", list.files("tsdb"))))
dir.create("tsdb", showWarnings = FALSE)
for (row in 1:nrow(sensors_df_mtrx)) {
  meter_value_type_subsets_mtrx <- sensors_df_mtrx[[row, 1]]
  ic_meters <- sensors_df_mtrx[[row, 2]]
  value_types <- sensors_df_mtrx[[row, 3]]
  col_counter <- 1
  row_counter <- 1
  for (icm in ic_meters) {
    for (vt in value_types) {
      export_df <- meter_value_type_subsets_mtrx[[row_counter, col_counter]]
      if (nrow(export_df) == 0) {
        next
        }
      #Remove spaces
      icm <- gsub(" ", "_", icm)
      vt <- gsub(" ", "_", vt)
      #Convert parantheses to hyphens
      icm <- gsub("\\(", "-", icm)
      icm <- gsub("\\)", "-", icm)
      #Add the meter and the value type as the metric column at the beginning of the dataframe to help ensure uniqueness
      metric = paste(icm, vt, sep = ".")
      if (metric == "London_7fb2bee9.NoiseAvg") {
        View(export_df)
      }
      export_df <- cbind(comp_metric = metric, export_df)
      #Add the meter as a tag at the end of the dataframe
      export_df$tag <- paste("meter", icm, sep = "=")
      #Change timestamp to milliseconds
      export_df$time_stamp <- as.numeric(as.POSIXlt(export_df$time_stamp, tz='UTC')) * 1000
      #Then restrict to 13 characters
      export_df$time_stamp <- strtrim(as.character(export_df$time_stamp), 13)
      #Turn off scientific formatting, in effect
      options(scipen=500) 
      #Make sure that all observations are unique
      export_df <- unique(export_df)
      #Order data in ascending order of time_stamp
      export_df <- export_df[order(export_df$time_stamp),]
      #Export data
      export_file_name <- paste("tsdb/", icm, ".", vt, ".tsdb", sep = "")
      #TODO use HTTP API to import data, though the batch import would probably be better for bulk data
      write.table(export_df, file = export_file_name, sep = " ", quote = FALSE, row.names = FALSE, col.names = FALSE)
      #Create a utility to delete the data that has been added, just so that it is easier to tidy up whilst testing
      write.table(paste("sudo /usr/share/opentsdb/bin/tsdb scan 1970/01/01-00:00:00 sum", metric, "--delete") , file = "tsdb/delete_data.sh", append = TRUE, row.names = FALSE, col.names = FALSE, quote = FALSE)
      system("chmod +x tsdb/delete_data.sh")
      col_counter <- col_counter + 1
    }
    col_counter <- 1
    row_counter <- row_counter + 1
  }
}