skytracker.storage.database_manager.DatabaseManager#
- class skytracker.storage.database_manager.DatabaseManager(username: str, password: str, host: str = 'localhost', port: int = 8123, database: str = '__default__', secure: bool = False)#
Bases:
objectGeneric async database manager for ClickHouse
Requires Docker container with ClickHouse server, which can be set up locally. To set up a local ClickHouse server, follow these steps (assumes Docker is installed):
Pull latest ClickHouse server container
docker pull clickhouse/clickhouse-server
Create an XML file to specify user privileges. Any filename is possible, as long as extension is
.xml. The path to the file is necessary later. Add the following content:<yandex> <users> <default> <access_management>1</access_management> <named_collection_control>1</named_collection_control> <show_named_collections>1</show_named_collections> <show_named_collections_secrets>1</show_named_collections> </default> </users> </yandex>
Run a new server with specific mounts and network settings. The following command runs a container with the created XML file. It mounts a
dataandlogs, which will be located in a directoryclickhousein the directory where the command is executed.docker run -d --name my-server-name --ulimit nofile=262144:262144 -v "$PWD/path/to/crea ted.xml:/etc/clickhouse-server/users.d/default-user-access.xml" -v "clickhouse_data :/var/lib/clickhouse/" -v "clickhouse_logs:/var/log/clickhouse-server" -p 8123: 8123 -p 9000:9000 clickhouse/clickhouse-server
Enter the native client:
docker exec -it my-server-name clickhouse-client
Create a new user with a specified password:
CREATE USER myusername IDENTIFIED BY 'mypassword';
Grant the new user access to all commands:
GRANT ALL ON *.* TO myusername;
Exit the native client:
quitTest connection using curl:
curl http://localhost:8123/ping
Methods
Initialize data manager by storing ClickHouse connection settings
Close client connection
Connect to the ClickHouse server
Create a table in the server
Drop a table
Insert new rows into a table
Check if the client is connected to the ClickHouse databse
Set a new connection status for the database manager
Set the database connection status to False
Perform an SQL query
Check if a specific table exists
- __init__(username: str, password: str, host: str = 'localhost', port: int = 8123, database: str = '__default__', secure: bool = False) None#
Initialize data manager by storing ClickHouse connection settings
- Parameters:
username (str) – server username
password (str) – server password
host (str, optional) – server host. Defaults to ‘localhost’.
port (int, optional) – server port. Defaults to 8123.
database (str, optional) – server database name. Defaults to ‘__default__’.
secure (bool, optional) – whether to use secure connection. Defaults to False.
- async close() None#
Close client connection
- async connect() None#
Connect to the ClickHouse server
- async create_table(name: str, columns: list[str], *args: str) None#
Create a table in the server
- Parameters:
name (str) – name of table to create
columns (list[str]) – column names and data types in SQL format (i.e. “time UInt32”)
args (str) – additional SQL statements for CREATE TABLE command
- async drop_table(name: str) None#
Drop a table
- Parameters:
name (str) – name of table to drop
- async insert(name: str, rows: Sequence[Sequence[Any]], column_names: list[str]) None#
Insert new rows into a table
- Parameters:
name (str) – name of table to insert rows in
rows (Sequence[Sequence[Any]]) – list of new rows
column_names (list[str]) – names of columns corresponding to row values
- async is_connected() bool#
Check if the client is connected to the ClickHouse databse
- Returns:
whether the client is connected to the ClickHouse database
- Return type:
bool
- async set_connected(new_connected: bool = True) None#
Set a new connection status for the database manager
- Parameters:
new_connected (bool, optional) – new connection status. Defaults to True.
- async set_disconnected() None#
Set the database connection status to False
- async sql_query(sql_query: str, time_limit: float = 1.0) Sequence[Sequence[Any]]#
Perform an SQL query
- Parameters:
sql_query (str) – SQL query
time_limit (float, optional) – query execution time limit [s]. Defaults to 1 second.
- Returns:
list of matching rows
- Return type:
Sequence[Sequence[Any]]
- async table_exists(name: str) bool#
Check if a specific table exists
- Parameters:
name (str) – name of table to find
- Returns:
whether specified table exists
- Return type:
bool