Author: Martin Donovan | GitHub gist | profile.json

Aloha POS DBF-to-PostgreSQL ETL — R: reads binary DBF files (GNDITEM, CAT, ITM, EMP), joins across 5 lookup tables including PLU-to-product-instance mapping, renames to DB schema columns, and bulk-inserts via dbWriteTable in a transaction. Orchestrator reads Aloha.ini for unit number, routes per entity to the correct database, and skips already-imported dates.


getEntityId <- function(alohaId, con) {
  df <- dbGetQuery(con, "select entity_id from entities where aloha_id = $1", params = list(alohaId))
  as.integer(df[1, 1])
}


getDBConnection <- function(db) {
  dbConnect(
    RPostgres::Postgres(),
    dbname   = db,
    host     = Sys.getenv("db_host"),
    port     = Sys.getenv("db_port"),
    user     = Sys.getenv("db_user"),
    password = Sys.getenv("db_pass")
  )
}

importExistsInDB <- function(date, entity, con) {
  df <- dbGetQuery(con,
    "select count(*) from pos_chkitems where date_of_sale = $1",
    params = list(date))
  df[[1]] > 0
}

importWineSales <- function(dataDir, majorId, minorId, db, test = FALSE) {
  con <- db

  bottleSizeDF    <- dbGetQuery(con, "select * from bottle_sizes")
  productLocationDF <- dbGetQuery(con,
    "select pila.product_instance_location_id, pila.product_instance_id
     from product_instances pi, product_instance_location_associations pila
     where pi.product_instance_id = pila.product_instance_id
     and pi.active = true and pila.active_location = true
     and pila.default_sale_location = true")
  PLUDF <- dbGetQuery(con,
    "select pppia.*,
     pi.unit_price, pi.is_club_list_selection,
     pi.is_clearance, pi.bottle_size_id
     from pos_plu_product_instance_associations pppia,
     product_instances pi
     where pi.product_instance_id = pppia.product_instance_id")

  catDF <- read.dbf(paste(dataDir, "/CAT.DBF", sep = "")) %>% select(ID, NAME)
  itmDF <- read.dbf(paste(dataDir, "/ITM.DBF", sep = "")) %>% select(ID, LONGNAME)
  empDF <- read.dbf(paste(dataDir, "/EMP.DBF", sep = "")) %>% select(ID, FIRSTNAME, LASTNAME)

  gnditemDF <- read.dbf(paste(dataDir, "/GNDITEM.dbf", sep = "")) %>%
    select(EMPLOYEE, CHECK, HOUR, MINUTE, ITEM, QUANTITY, MODCODE, PRICE, SYSDATE, CATEGORY, TABLEID) %>%
    filter(!is.na(ITEM))

  salesDataDF <- merge(gnditemDF, empDF, by.x = "EMPLOYEE", by.y = "ID")
  salesDataDF <- merge(salesDataDF, catDF, by.x = "CATEGORY", by.y = "ID")
  salesDataDF <- merge(salesDataDF, itmDF, by.x = "ITEM",     by.y = "ID") %>%
    filter(NAME %in% c("WINE", "BOTTLE WINE", "GLASS WINE")) %>%
    mutate(
      FIRSTNAME = as.character(FIRSTNAME),
      LASTNAME  = as.character(LASTNAME),
      LONGNAME  = as.character(LONGNAME),
      NAME      = as.character(NAME)
    )

  salesDataDF <- data.frame(salesDataDF)
  salesDataDF <- merge(salesDataDF, PLUDF,            by.x = "ITEM",            by.y = "pos_plu")
  salesDataDF <- merge(salesDataDF, productLocationDF, by.x = "product_instance_id", by.y = "product_instance_id")
  salesDataDF <- merge(salesDataDF, bottleSizeDF,      by.x = "bottle_size_id",  by.y = "bottle_size_id")

  salesDataDF <- salesDataDF %>%
    mutate(
      is_clearance           = replace_na(is_clearance, FALSE),
      is_club_list_selection = replace_na(is_club_list_selection, FALSE),
      major                  = majorId,
      minor                  = minorId,
      menu_price             = PRICE,
      sales_amt              = PRICE,
      is_option              = FALSE,
      disc_num               = 0,
      tax_code               = 1,
      deletion               = 0,
      overring               = FALSE,
      shift                  = NA
    ) %>%
    arrange(CHECK, HOUR, MINUTE) %>%
    rename(
      date_of_sale              = SYSDATE,
      check_num                 = CHECK,
      sales_cat                 = CATEGORY,
      num_sold                  = QUANTITY,
      user_name_first           = FIRSTNAME,
      user_name_last            = LASTNAME,
      pos_product_name          = LONGNAME,
      sales_units_sold          = number_sales_units,
      unit_cost_at_time_of_sale = unit_price,
      is_club_list_sale         = is_club_list_selection,
      is_clearance_sale         = is_clearance,
      sales_volume_unit_id      = sales_unit_id,
      sale_hour                 = HOUR,
      sale_minute               = MINUTE,
      table_number              = TABLEID,
      user_num                  = EMPLOYEE,
      item_num                  = ITEM
    ) %>%
    mutate(time_of_sale = as.integer(paste(sale_hour, sale_minute, sep = ""))) %>%
    select(-c(PRICE, bottle_size_id, MODCODE, NAME,
              is_wine_list_bin_number, bottle_size, bottle_size_unit))

  if (test) {
    dbExecute(con, "delete from pos_chkitems where date_of_sale = '1999-12-31'")
    salesDataDF <- salesDataDF %>% mutate(date_of_sale = "1999-12-31")
    dbWriteTable(con, "pos_chkitems", salesDataDF,
      row.names = FALSE, overwrite = FALSE, append = TRUE,
      field.types = NULL, temporary = FALSE, copy = TRUE)
  } else {
    dbWithTransaction(con, {
      dbWriteTable(con, "pos_chkitems", salesDataDF,
        row.names = FALSE, overwrite = FALSE, append = TRUE,
        field.types = NULL, temporary = FALSE, copy = TRUE)
    })
  }
}

getAlohaDataSourceId <- function(con) {
  df <- dbGetQuery(con,
    "select data_import_source_id from data_import_sources where lower(data_import_source_name) = 'aloha'")
  df[[1]]
}

folderIsValid <- function(folder) {
  ini  <- paste(folder, "/Aloha.ini",  sep = "")
  line <- paste(folder, "/GNDLINE.dbf", sep = "")

  if (!file.exists(ini) || !file.exists(line) || file.size(line) <= 1000) {
    return(FALSE)
  }

  iniDF <- read.csv2(ini, sep = "=", skip = 1, header = FALSE)
  rNum  <- as.integer(iniDF[iniDF$V1 == "UNITNUMBER", ]["V2"])
  !is.na(rNum)
}

if (!require("pacman")) install.packages("pacman")
pacman::p_load(tidyverse, foreign, RCurl, zip, here, DBI, RPostgreSQL, gtools, RPostgres)

test <- FALSE

baseDir    <- case_when(Sys.info()[["sysname"]] == "Windows" ~ "c:/scripts/WineSalesImport", .default = here())
AlohaPath  <- case_when(Sys.info()[["sysname"]] == "Windows" ~ "c:/BootDrv/Aloha",          .default = paste(here(), "/data", sep = ""))

# Load credentials from .Renviron (db_host, db_port, db_user, db_pass)
readRenviron(paste(baseDir, "/.Renviron", sep = ""))
source(paste(baseDir, "/functions.R", sep = ""), echo = FALSE)

LHRCCon      <- getDBConnection("lhrc_data")
entitiesDF   <- dbGetQuery(LHRCCon, "select * from entities")
dataSourceID <- getAlohaDataSourceId(LHRCCon)

wineDBCon <- NULL

numFolderToProcess <- 14

# Get most recent N date-named folders under AlohaPath
folders <- list.dirs(AlohaPath, recursive = FALSE)
folders <- folders[grepl("/\\d{8}$", folders)]
temp    <- as.Date(sub("^\\S+([0-9]{8})", "\\1", folders), "%Y%m%d")
folders <- folders[order(temp, decreasing = TRUE)]
folders <- folders[1:numFolderToProcess]

for (folder in folders) {
  if (folderIsValid(folder)) {
    bname      <- basename(folder)
    folderDate <- as.Date(bname, format = "%Y%m%d")

    ini   <- read.csv2(paste(folder, "/Aloha.ini", sep = ""), sep = "=", skip = 1, header = FALSE)
    rNum  <- as.integer(ini[ini$V1 == "UNITNUMBER", ]["V2"])

    entityRow     <- entitiesDF %>% filter(aloha_id == rNum)
    entityNumber  <- as.integer(unlist(entityRow %>% select(entity_id)))
    wineDBName    <- as.character(unlist(entityRow %>% select(wine_db_name)))
    majorId       <- as.integer(unlist(entityRow %>% select(wine_major_category_id)))
    minorId       <- as.integer(unlist(entityRow %>% select(wine_minor_category_id)))

    message("Processing folder: ", bname, " | entity: ", entityNumber, " | db: ", wineDBName)

    wineDBCon <- getDBConnection(wineDBName)

    if (test || !importExistsInDB(folderDate, entityNumber, wineDBCon)) {
      importWineSales(folder, majorId, minorId, wineDBCon, test)
    } else {
      message("  Already imported, skipping.")
    }

    dbDisconnect(wineDBCon)
  }
}

dbDisconnect(LHRCCon)