Skip to content

bayesian_ab_testing.data_preparation

This module contains fake data generators that one can use to test the package.

Object-relational mapper


create_ORM

Creates the sql tables by mapping objects to it

Parameters:

Name Type Description Default
path str

path to sqlite db

required
Source code in bayesian_ab_testing\data_preparation\schema.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def create_ORM(path):
    """Creates the sql tables by mapping objects to it

    Args:
        path (str): path to sqlite db
    """    

    engine = create_engine(f'sqlite:///{path}')

    Base = declarative_base()

    class DimDate(Base):
        __tablename__ = "DimDate"

        date_id = Column(Integer, primary_key=True)
        date = Column(DateTime)
        day = Column(Integer)
        month = Column(Integer)
        quarter = Column(Integer)
        year = Column(Integer)


    class DimArm(Base):
        __tablename__ = "DimArm"

        arm_id = Column(Integer, primary_key=True)
        type = Column(String)
        reward = Column(Float)
        active = Column(Boolean)


    class DimCustomer(Base):
        __tablename__ = "DimCustomer"

        customer_id = Column(Integer, primary_key=True)
        name = Column(String)
        location = Column(String)
        contact = Column(String)


    class Serve(Base):
        __tablename__ = "Serve"

        serve_id = Column(Integer, primary_key=True)
        date_id = Column(Integer, ForeignKey('DimDate.date_id'))
        customer_id = Column(Integer, ForeignKey('DimCustomer.customer_id'))
        arm_id = Column(Integer, ForeignKey('DimArm.arm_id'))
        information = Column(String)
        result = Column(Boolean, nullable = True)


    class AggregateResult(Base):
        __tablename__ = "AggregateResult"

        arm_id = Column(Integer, primary_key=True)
        customer_id = Column(Integer)
        n_triggered = Column(Integer)
        n_served = Column(Integer)
        a = Column(Float)
        b = Column(Float)
        average_reward = Column(Float)




    Base.metadata.create_all(engine)
    del Base
    del engine



Data generation


generate_arm

Generates an arm

Parameters:

Name Type Description Default
arm_id int

arm_id

required

Returns:

Name Type Description
dict dict

arm

Source code in bayesian_ab_testing\data_preparation\data_generator.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def generate_arm(arm_id) -> dict:
    """Generates an arm

    Args:
        arm_id (int): arm_id

    Returns:
        dict: arm
    """    

    return {
        "arm_id": arm_id,
        "type": fake.word(),
        "reward": np.random.randint(1, 15),
        "active": np.random.uniform() > 0.85
    }



generate_customer

Generates a customer with random info

Parameters:

Name Type Description Default
customer_id int

customer_id

required

Returns:

Name Type Description
dict dict

customer

Source code in bayesian_ab_testing\data_preparation\data_generator.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def generate_customer(customer_id) -> dict:
    """Generates a customer with random info

    Args:
        customer_id (int): customer_id

    Returns:
        dict: customer
    """    

    return {
        "customer_id": customer_id,
        "name": fake.name(),
        "location": fake.street_address(),
        "contact": fake.phone_number()
    }



generate_date

Generates a random date in 2023

Parameters:

Name Type Description Default
date_id int

date_id

required

Returns:

Name Type Description
dict dict

date

Source code in bayesian_ab_testing\data_preparation\data_generator.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def generate_date(date_id) -> dict:
    """Generates a random date in 2023

    Args:
        date_id (int): date_id

    Returns:
        dict: date 
    """    

    # Generate a random date between a specific date range
    start_date = datetime(2023, 1, 1)
    end_date = datetime(2023, 12, 31)
    random_date = fake.date_time_between_dates(start_date, end_date)

    # Extract year, quarter, and month from the random date
    year = random_date.year
    quarter = (random_date.month - 1) // 3 + 1
    month = random_date.strftime('%m')
    day = random_date.strftime('%d')

    return {
        "date_id": date_id,
        "date": random_date.strftime("%Y-%m-%d"),
        "day": day,
        "month": month,
        "quarter": quarter,
        "year": year
    }



generate_serve

Generates a serve with random info

Parameters:

Name Type Description Default
serve_id int

serve_id

required
date_id int

date_id

required
customer_id int

customer_id

required
arm_id int

arm_id

required
p float

probability of success

required

Returns:

Name Type Description
dict dict

serve

Source code in bayesian_ab_testing\data_preparation\data_generator.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def generate_serve(serve_id, date_id, customer_id, arm_id, p) -> dict:
    """Generates a serve with random info

    Args:
        serve_id (int): serve_id
        date_id (int): date_id
        customer_id (int): customer_id
        arm_id (int): arm_id
        p (float): probability of success

    Returns:
        dict: serve
    """    

    return {
        "serve_id": serve_id,
        "date_id": date_id,
        "customer_id": customer_id,
        "arm_id": arm_id,
        "information": json.dumps(fake.profile()),
        "result": np.random.uniform() >= p
    }



SQL Handler


SqlHandler

Bases: ISQL_Etiquette

Handles all interactions with the database

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class SqlHandler(ISQL_Etiquette):
    """Handles all interactions with the database"""

    def __init__(self, table_name: str) -> None:
        """Initializes the sql handler of the table `table_name`

        Args:
            table_name (str): Which table to tether the handler to
        """        
        super().__init__()

        self.cnxn = sqlite3.connect(db_path)
        self.table_name = table_name

        cur = self.exec(f'SELECT l.name FROM pragma_table_info("{table_name}") as l WHERE l.pk = 1;')
        self.pk = list(cur)[0][0]


    def __enter__(self):
        """Interfaces python `with` syntax"""
        return self


    def __exit__(self, *args, **kwargs):
        """Exits the python `with` scope"""

        self.close_cnxn()


    def close_cnxn(self)->None:
        """Close the connection gracefully"""

        logger.info('commiting the changes')
        self.cnxn.commit()
        logger.debug('closing connection')
        self.cnxn.close()
        logger.info('the connection has been closed')


    def insert_one(self, **kwargs) -> None:
        """Insert a single row into the table"""

        query = f"INSERT INTO {self.table_name} ({', '.join([k for k in kwargs.keys()])}) VALUES ({', '.join(['?'] * len(kwargs))});"
        logging.debug(query, [v for v in kwargs.values()])

        cur = self.exec(query, [v for v in kwargs.values()])
        logging.info("insert_one: " + str(list(cur)))


    def get_table_columns(self) -> list:
        """Gets a list of the column names that belong to the table"""

        cur = self.exec(f"PRAGMA table_info({self.table_name});")
        columns = cur.fetchall()

        column_names = [col[1] for col in columns]
        logger.info(f'the list of columns: {column_names}')

        return column_names


    def truncate_table(self) -> None:
        """Deletes all rows of the table"""

        query=f"delete from {self.table_name} where 1=1;"
        logging.info(f'the {self.table_name} is being truncated')
        self.exec(query)


    def drop_table(self):
        """Drops the table from the database"""

        query = f"DROP TABLE IF EXISTS {self.table_name};"
        logging.info(query)

        self.exec(query)
        logging.info(f"table '{self.table_name}' deleted.")


    def insert_many(self, df: pd.DataFrame):
        """Insert many values at once into the table

        Args:
            df (pd.DataFrame): data to insert
        """     

        df = df.replace(np.nan, None)
        df.rename(columns = lambda x: x.lower(), inplace=True)

        columns = list(df.columns)
        logger.info(f'BEFORE the column intersection: {columns}')

        sql_column_names = [i.lower() for i in self.get_table_columns()]
        columns = list(set(columns) & set(sql_column_names))
        logger.info(f'AFTER the column intersection: {columns}')

        assert len(df.columns) == len(sql_column_names), "Mismatch in columns between the dataframe and the sql table. \ndf.columns: " + \
            ", ".join(df.columns) + "\nsql columns: " + ", ".join(sql_column_names)

        ncolumns = list(len(columns) * '?')
        data_to_insert = df.loc[:, columns]
        values = [tuple(i) for i in data_to_insert.values]

        logger.info(f'the shape of the table which is going to be imported {data_to_insert.shape}')

        if len(columns)>1:
            cols, params =', '.join(columns), ', '.join(ncolumns)
        else:
            cols, params = columns, ncolumns

        logger.info(f'insert structure: colnames: {cols} params: {params}')
        logger.info(values[0])
        query=f"""INSERT INTO {self.table_name} ({cols}) VALUES ({params});"""

        logger.info(f'QUERY: {query}')

        cur = self.exec_many(query, values)

        try:
            for i in cur.messages:
                logger.info(i)
        except:
            pass

        logger.warning('the data is loaded')


    def from_sql_to_pandas(self, chunksize:int=64) -> pd.DataFrame:
        """Converts the table into a pandas dataframe and returns it, 
        reads the table in `chnksize`-sized chunks

        Args:
            chunksize (int, optional): Number of rows per sql request. Defaults to 64.

        Returns:
            pd.DataFrame: data
        """        

        offset=0
        dfs=[]

        while True:
            query=f"""
            SELECT * FROM {self.table_name}
                LIMIT {chunksize}  
                OFFSET {offset}
            """
            data = pd.read_sql_query(query, self.cnxn)
            self.cnxn.commit()

            logger.info(f'the shape of the chunk: {data.shape}')

            dfs.append(data)
            offset += chunksize

            if len(dfs[-1]) < chunksize:
                logger.info('loading the data from SQL is finished')
                logger.debug('connection is closed')
                break

        df = pd.concat(dfs)

        return df


    def update_table(self, set_values: dict, condition: str):
        """Updates some the values of some fields of some rows (based on the condition)

        Args:
            set_values (dict): which columns to assign which values
            condition (str): condition upon which to update
        """

        if not set_values:
            logger.warning('No values to update. Provide set_values.')
            return

        set_clause = ', '.join(f"{col} = ?" for col in set_values.keys())
        values = list(set_values.values())

        query = f"""
        UPDATE {self.table_name}
        SET {set_clause}
        WHERE {condition};
        """

        cur = self.exec(query, values if hasattr(values, "__iter__") else list(values))

        logger.info(f"Rows updated: {cur.rowcount}")


    def update_one(self, id: int, **kwargs: dict):
        """Updates a single row in the table

        Args:
            id (int): row to update
        """

        cond = self.pk + " = " + str(id)
        self.update_table(kwargs, cond)


    def select_one(self, id: int, cols: list = []) -> dict:
        """Selects only one row and returns it as a python dictionary

        Args:
            id (int): id of row
            cols (list, optional): Which columns to select. Selects all columns if the list is empty. Defaults to [].

        Returns:
            dict: row
        """   

        cond = self.pk + " = ?"
        query = f"select {'*' if len(cols) == 0 else ', '.join(cols)} from {self.table_name} where {cond}"
        cur = self.exec(query, (id, ))

        return {k:v for k, v in zip(self.get_table_columns() if len(cols) == 0 else cols, list(cur)[0])}


    def get_next_id(self):
        """Conveniently gets the increment of the previous id (used for creating a new entry)"""

        query = f"select (max({self.pk}) + 1) from {self.table_name};"
        cur = list(self.exec(query))
        cur = cur[0]

        return 0 if cur is None else cur[0] if cur[0] is not None else 0

__enter__()

Interfaces python with syntax

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
39
40
41
def __enter__(self):
    """Interfaces python `with` syntax"""
    return self

__exit__(*args, **kwargs)

Exits the python with scope

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
44
45
46
47
def __exit__(self, *args, **kwargs):
    """Exits the python `with` scope"""

    self.close_cnxn()

__init__(table_name)

Initializes the sql handler of the table table_name

Parameters:

Name Type Description Default
table_name str

Which table to tether the handler to

required
Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self, table_name: str) -> None:
    """Initializes the sql handler of the table `table_name`

    Args:
        table_name (str): Which table to tether the handler to
    """        
    super().__init__()

    self.cnxn = sqlite3.connect(db_path)
    self.table_name = table_name

    cur = self.exec(f'SELECT l.name FROM pragma_table_info("{table_name}") as l WHERE l.pk = 1;')
    self.pk = list(cur)[0][0]

close_cnxn()

Close the connection gracefully

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
50
51
52
53
54
55
56
57
def close_cnxn(self)->None:
    """Close the connection gracefully"""

    logger.info('commiting the changes')
    self.cnxn.commit()
    logger.debug('closing connection')
    self.cnxn.close()
    logger.info('the connection has been closed')

drop_table()

Drops the table from the database

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
90
91
92
93
94
95
96
97
def drop_table(self):
    """Drops the table from the database"""

    query = f"DROP TABLE IF EXISTS {self.table_name};"
    logging.info(query)

    self.exec(query)
    logging.info(f"table '{self.table_name}' deleted.")

from_sql_to_pandas(chunksize=64)

Converts the table into a pandas dataframe and returns it, reads the table in chnksize-sized chunks

Parameters:

Name Type Description Default
chunksize int

Number of rows per sql request. Defaults to 64.

64

Returns:

Type Description
DataFrame

pd.DataFrame: data

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def from_sql_to_pandas(self, chunksize:int=64) -> pd.DataFrame:
    """Converts the table into a pandas dataframe and returns it, 
    reads the table in `chnksize`-sized chunks

    Args:
        chunksize (int, optional): Number of rows per sql request. Defaults to 64.

    Returns:
        pd.DataFrame: data
    """        

    offset=0
    dfs=[]

    while True:
        query=f"""
        SELECT * FROM {self.table_name}
            LIMIT {chunksize}  
            OFFSET {offset}
        """
        data = pd.read_sql_query(query, self.cnxn)
        self.cnxn.commit()

        logger.info(f'the shape of the chunk: {data.shape}')

        dfs.append(data)
        offset += chunksize

        if len(dfs[-1]) < chunksize:
            logger.info('loading the data from SQL is finished')
            logger.debug('connection is closed')
            break

    df = pd.concat(dfs)

    return df

get_next_id()

Conveniently gets the increment of the previous id (used for creating a new entry)

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
241
242
243
244
245
246
247
248
def get_next_id(self):
    """Conveniently gets the increment of the previous id (used for creating a new entry)"""

    query = f"select (max({self.pk}) + 1) from {self.table_name};"
    cur = list(self.exec(query))
    cur = cur[0]

    return 0 if cur is None else cur[0] if cur[0] is not None else 0

get_table_columns()

Gets a list of the column names that belong to the table

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
70
71
72
73
74
75
76
77
78
79
def get_table_columns(self) -> list:
    """Gets a list of the column names that belong to the table"""

    cur = self.exec(f"PRAGMA table_info({self.table_name});")
    columns = cur.fetchall()

    column_names = [col[1] for col in columns]
    logger.info(f'the list of columns: {column_names}')

    return column_names

insert_many(df)

Insert many values at once into the table

Parameters:

Name Type Description Default
df DataFrame

data to insert

required
Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def insert_many(self, df: pd.DataFrame):
    """Insert many values at once into the table

    Args:
        df (pd.DataFrame): data to insert
    """     

    df = df.replace(np.nan, None)
    df.rename(columns = lambda x: x.lower(), inplace=True)

    columns = list(df.columns)
    logger.info(f'BEFORE the column intersection: {columns}')

    sql_column_names = [i.lower() for i in self.get_table_columns()]
    columns = list(set(columns) & set(sql_column_names))
    logger.info(f'AFTER the column intersection: {columns}')

    assert len(df.columns) == len(sql_column_names), "Mismatch in columns between the dataframe and the sql table. \ndf.columns: " + \
        ", ".join(df.columns) + "\nsql columns: " + ", ".join(sql_column_names)

    ncolumns = list(len(columns) * '?')
    data_to_insert = df.loc[:, columns]
    values = [tuple(i) for i in data_to_insert.values]

    logger.info(f'the shape of the table which is going to be imported {data_to_insert.shape}')

    if len(columns)>1:
        cols, params =', '.join(columns), ', '.join(ncolumns)
    else:
        cols, params = columns, ncolumns

    logger.info(f'insert structure: colnames: {cols} params: {params}')
    logger.info(values[0])
    query=f"""INSERT INTO {self.table_name} ({cols}) VALUES ({params});"""

    logger.info(f'QUERY: {query}')

    cur = self.exec_many(query, values)

    try:
        for i in cur.messages:
            logger.info(i)
    except:
        pass

    logger.warning('the data is loaded')

insert_one(**kwargs)

Insert a single row into the table

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
60
61
62
63
64
65
66
67
def insert_one(self, **kwargs) -> None:
    """Insert a single row into the table"""

    query = f"INSERT INTO {self.table_name} ({', '.join([k for k in kwargs.keys()])}) VALUES ({', '.join(['?'] * len(kwargs))});"
    logging.debug(query, [v for v in kwargs.values()])

    cur = self.exec(query, [v for v in kwargs.values()])
    logging.info("insert_one: " + str(list(cur)))

select_one(id, cols=[])

Selects only one row and returns it as a python dictionary

Parameters:

Name Type Description Default
id int

id of row

required
cols list

Which columns to select. Selects all columns if the list is empty. Defaults to [].

[]

Returns:

Name Type Description
dict dict

row

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def select_one(self, id: int, cols: list = []) -> dict:
    """Selects only one row and returns it as a python dictionary

    Args:
        id (int): id of row
        cols (list, optional): Which columns to select. Selects all columns if the list is empty. Defaults to [].

    Returns:
        dict: row
    """   

    cond = self.pk + " = ?"
    query = f"select {'*' if len(cols) == 0 else ', '.join(cols)} from {self.table_name} where {cond}"
    cur = self.exec(query, (id, ))

    return {k:v for k, v in zip(self.get_table_columns() if len(cols) == 0 else cols, list(cur)[0])}

truncate_table()

Deletes all rows of the table

Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
82
83
84
85
86
87
def truncate_table(self) -> None:
    """Deletes all rows of the table"""

    query=f"delete from {self.table_name} where 1=1;"
    logging.info(f'the {self.table_name} is being truncated')
    self.exec(query)

update_one(id, **kwargs)

Updates a single row in the table

Parameters:

Name Type Description Default
id int

row to update

required
Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
212
213
214
215
216
217
218
219
220
def update_one(self, id: int, **kwargs: dict):
    """Updates a single row in the table

    Args:
        id (int): row to update
    """

    cond = self.pk + " = " + str(id)
    self.update_table(kwargs, cond)

update_table(set_values, condition)

Updates some the values of some fields of some rows (based on the condition)

Parameters:

Name Type Description Default
set_values dict

which columns to assign which values

required
condition str

condition upon which to update

required
Source code in bayesian_ab_testing\data_preparation\sql_interactions.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def update_table(self, set_values: dict, condition: str):
    """Updates some the values of some fields of some rows (based on the condition)

    Args:
        set_values (dict): which columns to assign which values
        condition (str): condition upon which to update
    """

    if not set_values:
        logger.warning('No values to update. Provide set_values.')
        return

    set_clause = ', '.join(f"{col} = ?" for col in set_values.keys())
    values = list(set_values.values())

    query = f"""
    UPDATE {self.table_name}
    SET {set_clause}
    WHERE {condition};
    """

    cur = self.exec(query, values if hasattr(values, "__iter__") else list(values))

    logger.info(f"Rows updated: {cur.rowcount}")