Source code for can.io.sqlite

# coding: utf-8

"""
Implements an SQL database writer and reader for storing CAN messages.

.. note:: The database schema is given in the documentation of the loggers.
"""

from __future__ import absolute_import

import sys
import time
import threading
import logging
import sqlite3

from can.listener import BufferedReader
from can.message import Message
from .generic import BaseIOHandler

log = logging.getLogger('can.io.sqlite')

if sys.version_info.major < 3:
    # legacy fallback for Python 2
    memoryview = buffer


[docs]class SqliteReader(BaseIOHandler): """ Reads recorded CAN messages from a simple SQL database. This class can be iterated over or used to fetch all messages in the database with :meth:`~SqliteReader.read_all`. Calling :func:`~builtin.len` on this object might not run in constant time. :attr str table_name: the name of the database table used for storing the messages .. note:: The database schema is given in the documentation of the loggers. """ def __init__(self, file, table_name="messages"): """ :param file: a `str` or since Python 3.7 a path like object that points to the database file to use :param str table_name: the name of the table to look for the messages .. warning:: In contrary to all other readers/writers the Sqlite handlers do not accept file-like objects as the `file` parameter. It also runs in ``append=True`` mode all the time. """ super(SqliteReader, self).__init__(file=None) self._conn = sqlite3.connect(file) self._cursor = self._conn.cursor() self.table_name = table_name def __iter__(self): for frame_data in self._cursor.execute("SELECT * FROM {}".format(self.table_name)): yield SqliteReader._assemble_message(frame_data) @staticmethod def _assemble_message(frame_data): timestamp, can_id, is_extended, is_remote, is_error, dlc, data = frame_data return Message( timestamp=timestamp, is_remote_frame=bool(is_remote), is_extended_id=bool(is_extended), is_error_frame=bool(is_error), arbitration_id=can_id, dlc=dlc, data=data ) def __len__(self): # this might not run in constant time result = self._cursor.execute("SELECT COUNT(*) FROM {}".format(self.table_name)) return int(result.fetchone()[0])
[docs] def read_all(self): """Fetches all messages in the database. :rtype: Generator[can.Message] """ result = self._cursor.execute("SELECT * FROM {}".format(self.table_name)).fetchall() return (SqliteReader._assemble_message(frame) for frame in result)
[docs] def stop(self): """Closes the connection to the database. """ super(SqliteReader, self).stop() self._conn.close()
[docs]class SqliteWriter(BaseIOHandler, BufferedReader): """Logs received CAN data to a simple SQL database. The sqlite database may already exist, otherwise it will be created when the first message arrives. Messages are internally buffered and written to the SQL file in a background thread. Ensures that all messages that are added before calling :meth:`~can.SqliteWriter.stop()` are actually written to the database after that call returns. Thus, calling :meth:`~can.SqliteWriter.stop()` may take a while. :attr str table_name: the name of the database table used for storing the messages :attr int num_frames: the number of frames actually written to the database, this excludes messages that are still buffered :attr float last_write: the last time a message war actually written to the database, as given by ``time.time()`` .. note:: When the listener's :meth:`~SqliteWriter.stop` method is called the thread writing to the database will continue to receive and internally buffer messages if they continue to arrive before the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT`. If the :attr:`~SqliteWriter.GET_MESSAGE_TIMEOUT` expires before a message is received, the internal buffer is written out to the database file. However if the bus is still saturated with messages, the Listener will continue receiving until the :attr:`~can.SqliteWriter.MAX_TIME_BETWEEN_WRITES` timeout is reached or more than :attr:`~can.SqliteWriter.MAX_BUFFER_SIZE_BEFORE_WRITES` messages are buffered. .. note:: The database schema is given in the documentation of the loggers. """ GET_MESSAGE_TIMEOUT = 0.25 """Number of seconds to wait for messages from internal queue""" MAX_TIME_BETWEEN_WRITES = 5.0 """Maximum number of seconds to wait between writes to the database""" MAX_BUFFER_SIZE_BEFORE_WRITES = 500 """Maximum number of messages to buffer before writing to the database""" def __init__(self, file, table_name="messages"): """ :param file: a `str` or since Python 3.7 a path like object that points to the database file to use :param str table_name: the name of the table to store messages in .. warning:: In contrary to all other readers/writers the Sqlite handlers do not accept file-like objects as the `file` parameter. """ super(SqliteWriter, self).__init__(file=None) self.table_name = table_name self._db_filename = file self._stop_running_event = threading.Event() self._writer_thread = threading.Thread(target=self._db_writer_thread) self._writer_thread.start() self.num_frames = 0 self.last_write = time.time() def _create_db(self): """Creates a new databae or opens a connection to an existing one. .. note:: You can't share sqlite3 connections between threads (by default) hence we setup the db here. It has the upside of running async. """ log.debug("Creating sqlite database") self._conn = sqlite3.connect(self._db_filename) # create table structure self._conn.cursor().execute(""" CREATE TABLE IF NOT EXISTS {} ( ts REAL, arbitration_id INTEGER, extended INTEGER, remote INTEGER, error INTEGER, dlc INTEGER, data BLOB ) """.format(self.table_name)) self._conn.commit() self._insert_template = "INSERT INTO {} VALUES (?, ?, ?, ?, ?, ?, ?)".format(self.table_name) def _db_writer_thread(self): self._create_db() try: while True: messages = [] # reset buffer msg = self.get_message(self.GET_MESSAGE_TIMEOUT) while msg is not None: #log.debug("SqliteWriter: buffering message") messages.append(( msg.timestamp, msg.arbitration_id, msg.is_extended_id, msg.is_remote_frame, msg.is_error_frame, msg.dlc, memoryview(msg.data) )) if time.time() - self.last_write > self.MAX_TIME_BETWEEN_WRITES or \ len(messages) > self.MAX_BUFFER_SIZE_BEFORE_WRITES: break else: # just go on msg = self.get_message(self.GET_MESSAGE_TIMEOUT) count = len(messages) if count > 0: with self._conn: #log.debug("Writing %d frames to db", count) self._conn.executemany(self._insert_template, messages) self._conn.commit() # make the changes visible to the entire database self.num_frames += count self.last_write = time.time() # check if we are still supposed to run and go back up if yes if self._stop_running_event.is_set(): break finally: self._conn.close() log.info("Stopped sqlite writer after writing %d messages", self.num_frames)
[docs] def stop(self): """Stops the reader an writes all remaining messages to the database. Thus, this might take a while and block. """ BufferedReader.stop(self) self._stop_running_event.set() self._writer_thread.join() BaseIOHandler.stop(self)