Home Reference Source

src/InfluxWriter.ts

import os from 'os'
import { IClusterConfig, InfluxDB, IPoint, ISingleHostConfig, toNanoDate } from 'influx'

import Batcher, { IBatcherOptions } from './Batcher'


type InfluxDbOptions = ISingleHostConfig | IClusterConfig
export type InfluxWriteOptions = InfluxDbOptions & IBatcherOptions & {
    tags: Record<string, string>
}
/**
 * InfluxWriter batches points and writes them to influxdb.
 * @type {module.InfluxWriter}
 */
export default class InfluxWriter {
    protected _counter: number
    private _tags: Record<string, string>
    protected _influx: InfluxDB
    private _batcher: Batcher
    /**
     *
     * @param options
     */
    constructor(options: InfluxWriteOptions) {
        /**
         * @type {InfluxDB}
         */
        this._influx = new InfluxDB(options)
        /**
         * @type {module.Batcher}
         * @private
         */
        this._batcher = new Batcher(options)
        this._batcher.on('flush', (batch: IPoint[]) => this._flushBatch(batch))
        /**
         * Influx will overwrite points using the same timestamp and tags.
         * To work around this we use this _counter field as the nanosecond
         * segment of the timestamp and increment it every time a point is
         * written. This way it is extremely unlikely that any points are
         * overwritten.
         * @type {number}
         * @private
         */
        this._counter = 0
        /**
         * @type {Object<String, String>}
         * @private
         */
        this._tags = options.tags || {}
    }

    /**
     * Add a point to the current batch. If this batch has exceeded the maximum batch
     * size it will be flushed.
     *
     * @param {String} measurement The Influx measurement name.
     * @param {Object.<String, String>} tags The list of tag values to insert.
     * @param {Object.<String, *>} fields The list of field values to insert.
     * @param {Date} [time=new Date()] The time for the measurement.
     */
    writePoint(measurement: string, tags: Record<string, string>, fields: Record<string, unknown>, time = new Date()): void {
        this._counter = (this._counter + 1) % 1000

        this._batcher.write({
            measurement: measurement,
            tags: {hostname: os.hostname(), ...this._tags, ...tags},
            fields: fields,
            timestamp: toNanoDate(
                time.getTime() + '000' + this._counter.toString().padStart(3, '0')
            )
        })
    }

    /**
     * Flush a batch to InfluxDB. This method is triggered by the 'flush' event from Batcher.
     * @param batch
     * @private
     */
    async _flushBatch(batch: IPoint[]): Promise<void> {
        try {
            await this._influx.writePoints(batch, {precision: 'n'})
        } catch(err) {
            console.error(`Failed to write batch to influx: ${err}`)
        }
    }
}