Source code for can.io.sqlite

#!/usr/bin/env python
# coding: utf-8

"""
Implements an SQL database writer and reader for storing CAN messages.

The database schema is given in the documentation of the loggers.
"""

import sys
import time
import threading
import logging
import sqlite3

from can.listener import BufferedReader
from can.message import Message

log = logging.getLogger('can.io.sql')

# TODO comment on this
if sys.version_info > (3,):
    buffer = memoryview


class SqliteReader:
    """
    Reads recorded CAN messages from a simple SQL database.

    This class can be iterated over or used to fetch all messages in the
    database with :meth:`~SqliteReader.read_all`.

    Calling len() on this object might not run in constant time.
    """

    _SELECT_ALL_COMMAND = "SELECT * FROM messages"

    def __init__(self, filename):
        log.debug("Starting SqliteReader with %s", filename)
        self.conn = sqlite3.connect(filename)
        self.cursor = self.conn.cursor()

    @staticmethod
    def _create_frame_from_db_tuple(frame_data):
        timestamp, can_id, is_extended, is_remote, is_error, dlc, data = frame_data
        return Message(
            timestamp, is_remote, is_extended, is_error, can_id, dlc, data
        )

    def __iter__(self):
        log.debug("Iterating through messages from sql db")
        for frame_data in self.cursor.execute(self._SELECT_ALL_COMMAND):
            yield self._create_frame_from_db_tuple(frame_data)

    def __len__(self):
        # this might not run in constant time
        result = self.cursor.execute("SELECT COUNT(*) FROM messages")
        return int(result.fetchone()[0])

    def read_all(self):
        """Fetches all messages in the database."""
        result = self.cursor.execute(self._SELECT_ALL_COMMAND)
        return result.fetchall()

    def close(self):
        """Closes the connection to the database."""
        self.conn.close()


[docs]class SqliteWriter(BufferedReader): """Logs received CAN data to a simple SQL database. The sqlite database may already exist, otherwise it will be created when the first message arrives. Messages are internally buffered and written to the SQL file in a background thread. .. note:: When the listener's :meth:`~SqliteWriter.stop` method is called the thread writing to the sql file will continue to receive and internally buffer messages if they continue to arrive before the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT`. If the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT` expires before a message is received, the internal buffer is written out to the sql file. However if the bus is still saturated with messages, the Listener will continue receiving until the :attr:`~SqliteWriter.MAX_TIME_BETWEEN_WRITES` timeout is reached. """ _INSERT_MSG_TEMPLATE = ''' INSERT INTO messages VALUES (?, ?, ?, ?, ?, ?, ?) ''' GET_MESSAGE_TIMEOUT = 0.25 """Number of seconds to wait for messages from internal queue""" MAX_TIME_BETWEEN_WRITES = 5 """Maximum number of seconds to wait between writes to the database""" def __init__(self, filename): super(SqliteWriter, self).__init__() self.db_fn = filename self.stop_running_event = threading.Event() self.writer_thread = threading.Thread(target=self._db_writer_thread) self.writer_thread.start() def _create_db(self): # Note: you can't share sqlite3 connections between threads # hence we setup the db here. log.info("Creating sqlite database") self.conn = sqlite3.connect(self.db_fn) cursor = self.conn.cursor() # create table structure cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( ts REAL, arbitration_id INTEGER, extended INTEGER, remote INTEGER, error INTEGER, dlc INTEGER, data BLOB ) ''') self.conn.commit() def _db_writer_thread(self): num_frames = 0 last_write = time.time() self._create_db() while not self.stop_running_event.is_set(): messages = [] msg = self.get_message(self.GET_MESSAGE_TIMEOUT) while msg is not None: log.debug("SqliteWriter: buffering message") messages.append(( msg.timestamp, msg.arbitration_id, msg.id_type, msg.is_remote_frame, msg.is_error_frame, msg.dlc, buffer(msg.data) )) if time.time() - last_write > self.MAX_TIME_BETWEEN_WRITES: log.debug("Max timeout between writes reached") break msg = self.get_message(self.GET_MESSAGE_TIMEOUT) count = len(messages) if count > 0: with self.conn: log.debug("Writing %s frames to db", count) self.conn.executemany(SqliteWriter._INSERT_MSG_TEMPLATE, messages) self.conn.commit() # make the changes visible to the entire database num_frames += count last_write = time.time() # go back up and check if we are still supposed to run self.conn.close() log.info("Stopped sqlite writer after writing %s messages", num_frames)
[docs] def stop(self): self.stop_running_event.set() log.debug("Stopping sqlite writer") self.writer_thread.join()