Skip to content

Commit fedd075

Browse files
committed
[blockchain] Add functionality to batch insert blocks during database synchronization
1 parent 52f8a9f commit fedd075

8 files changed

Lines changed: 201 additions & 67 deletions

File tree

blockchain/explorer.py

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(self, log_queue):
3636
# Get some configuration parameters
3737
self.config = BlockchainConfig.config
3838
self.poll_interval = self.config["explorer"]["poll_interval"]
39+
self.batch_size = self.config["explorer"]["batch_size"]
3940
self.addresses_config = self.config["api"]["caching"]["scripts"]["addresses"]
4041

4142
# Set up logger
@@ -117,6 +118,52 @@ def insert_block(self, database, block_hash_hex_str, block, epoch, tapi_periods)
117118

118119
return block_json
119120

121+
def batch_insert_blocks(self, database, block_hashes, blocks, epochs, tapi_periods):
122+
addresses = {}
123+
batched_transactions = {}
124+
for block_hash, block, epoch in zip(block_hashes, blocks, epochs):
125+
# Create block object and parse it to a JSON object
126+
block = Block(
127+
block=block,
128+
block_hash=block_hash,
129+
log_queue=self.log_queue,
130+
database=self.insert_blocks_database,
131+
tapi_periods=tapi_periods,
132+
witnet_node=self.insert_blocks_node,
133+
transaction_batch=batched_transactions,
134+
)
135+
block_json = block.process_block("explorer")
136+
137+
# Insert block
138+
database.insert_block(block_json)
139+
140+
# Insert transactions
141+
transactions = self.insert_transactions(database, block_json, epoch)
142+
batched_transactions.update(transactions)
143+
144+
# Insert address data
145+
block_addresses = block.process_addresses(as_dict=True)
146+
for address, data in block_addresses.items():
147+
if address not in addresses:
148+
addresses[address] = [address, epoch] + data
149+
else:
150+
addresses[address][1] = epoch
151+
addresses[address][2] += data[0]
152+
addresses[address][3] += data[1]
153+
addresses[address][4] += data[2]
154+
addresses[address][5] += data[3]
155+
addresses[address][6] += data[4]
156+
addresses[address][7] += data[5]
157+
addresses[address][8] += data[6]
158+
addresses[address][9] += data[7]
159+
addresses[address][10] += data[8]
160+
161+
# Insert all address data
162+
database.insert_addresses(list(addresses.values()))
163+
164+
# Finalize insertions and updates on every block
165+
database.finalize(epochs)
166+
120167
def update_cached_views(self, block_json, logger, caching_server):
121168
epoch = block_json["details"]["epoch"]
122169

@@ -263,36 +310,49 @@ def update_cached_views(self, block_json, logger, caching_server):
263310
self.try_send_request(logger, caching_server, request)
264311

265312
def insert_transactions(self, database, block_json, epoch):
313+
transactions = {}
314+
266315
# Insert mint transaction
267-
database.insert_mint_txn(block_json["transactions"]["mint"], epoch)
316+
mint_txn = block_json["transactions"]["mint"]
317+
database.insert_mint_txn(mint_txn, epoch)
318+
transactions[mint_txn["hash"]] = mint_txn
268319

269320
# Insert value transfer transactions
270321
for txn_details in block_json["transactions"]["value_transfer"]:
271322
database.insert_value_transfer_txn(txn_details, epoch)
323+
transactions[txn_details["hash"]] = txn_details
272324

273325
# Insert data request transactions
274326
for txn_details in block_json["transactions"]["data_request"]:
275327
database.insert_data_request_txn(txn_details, epoch)
328+
transactions[txn_details["hash"]] = txn_details
276329

277330
# Insert commit transactions
278331
for txn_details in block_json["transactions"]["commit"]:
279332
database.insert_commit_txn(txn_details, epoch)
333+
transactions[txn_details["hash"]] = txn_details
280334

281335
# Insert reveal transactions
282336
for txn_details in block_json["transactions"]["reveal"]:
283337
database.insert_reveal_txn(txn_details, epoch)
338+
transactions[txn_details["hash"]] = txn_details
284339

285340
# Insert tally transactions
286341
for txn_details in block_json["transactions"]["tally"]:
287342
database.insert_tally_txn(txn_details, epoch)
343+
transactions[txn_details["hash"]] = txn_details
288344

289345
# Insert tally transactions
290346
for txn_details in block_json["transactions"]["stake"]:
291347
database.insert_stake_txn(txn_details, epoch)
348+
transactions[txn_details["hash"]] = txn_details
292349

293350
# Insert tally transactions
294351
for txn_details in block_json["transactions"]["unstake"]:
295352
database.insert_unstake_txn(txn_details, epoch)
353+
transactions[txn_details["hash"]] = txn_details
354+
355+
return transactions
296356

297357
def insert_blocks_and_transactions(self, log_queue, unconfirmed_blocks_queue):
298358
# Set up logger
@@ -347,8 +407,9 @@ def insert_blocks_and_transactions(self, log_queue, unconfirmed_blocks_queue):
347407
else:
348408
blockchain = blockchain["result"]
349409

410+
block_hashes, blocks, epochs = [], [], []
350411
for epoch, block_hash_hex_str in blockchain:
351-
logger.info(f"Inserting data for epoch {epoch}")
412+
logger.info(f"Fetching data for epoch {epoch}")
352413

353414
block = self.insert_blocks_node.get_block(block_hash_hex_str)
354415
# The database entries related to this block have not been modified yet
@@ -361,16 +422,33 @@ def insert_blocks_and_transactions(self, log_queue, unconfirmed_blocks_queue):
361422
block = block["result"]
362423

363424
# Insert block
364-
block_json = self.insert_block(
365-
self.insert_blocks_database,
366-
block_hash_hex_str,
367-
block,
368-
epoch,
369-
tapi_periods,
370-
)
425+
if len(blockchain) < self.batch_size:
426+
block_json = self.insert_block(
427+
self.insert_blocks_database,
428+
block_hash_hex_str,
429+
block,
430+
epoch,
431+
tapi_periods,
432+
)
371433

372-
# Update all cached views
373-
self.update_cached_views(block_json, logger, caching_server)
434+
# Update all cached views
435+
self.update_cached_views(block_json, logger, caching_server)
436+
# Batch insert block if they older than the batch size
437+
else:
438+
block_hashes.append(block_hash_hex_str)
439+
blocks.append(block)
440+
epochs.append(epoch)
441+
if len(block_hashes) == self.batch_size:
442+
self.batch_insert_blocks(
443+
self.insert_blocks_database,
444+
block_hashes,
445+
blocks,
446+
epochs,
447+
tapi_periods,
448+
)
449+
block_hashes = []
450+
blocks = []
451+
epochs = []
374452

375453
# Check if the block is confirmed and if it isn't track the hash
376454
confirmed = block["confirmed"]

blockchain/objects/block.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(
2828
database=None,
2929
tapi_periods=None,
3030
witnet_node=None,
31+
transaction_batch=None,
3132
):
3233
self.block = block
3334
self.block_hash = block_hash
@@ -69,6 +70,8 @@ def __init__(
6970

7071
self.tapi_periods = tapi_periods
7172

73+
self.transaction_batch = transaction_batch
74+
7275
def configure_logging_process(self, queue, label):
7376
handler = logging.handlers.QueueHandler(queue)
7477
root = logging.getLogger(label)
@@ -228,6 +231,7 @@ def process_value_transfer_txns(self, call_from):
228231
if len(self.block["txns_hashes"]["value_transfer"]) > 0:
229232
value_transfer = ValueTransfer(
230233
database=self.database,
234+
transaction_batch=self.transaction_batch,
231235
logger=self.logger,
232236
witnet_node=self.witnet_node,
233237
)
@@ -251,6 +255,7 @@ def process_data_request_txns(self, call_from):
251255
if len(self.block["txns_hashes"]["data_request"]) > 0:
252256
data_request = DataRequest(
253257
database=self.database,
258+
transaction_batch=self.transaction_batch,
254259
logger=self.logger,
255260
witnet_node=self.witnet_node,
256261
)
@@ -274,6 +279,7 @@ def process_commit_txns(self, call_from):
274279
if len(self.block["txns_hashes"]["commit"]) > 0:
275280
commit = Commit(
276281
database=self.database,
282+
transaction_batch=self.transaction_batch,
277283
logger=self.logger,
278284
witnet_node=self.witnet_node,
279285
)
@@ -288,6 +294,7 @@ def process_reveal_txns(self, call_from):
288294
if len(self.block["txns_hashes"]["reveal"]) > 0:
289295
reveal = Reveal(
290296
database=self.database,
297+
transaction_batch=self.transaction_batch,
291298
logger=self.logger,
292299
witnet_node=self.witnet_node,
293300
)
@@ -319,6 +326,7 @@ def process_stake_txns(self, call_from):
319326
):
320327
stake = Stake(
321328
database=self.database,
329+
transaction_batch=self.transaction_batch,
322330
logger=self.logger,
323331
witnet_node=self.witnet_node,
324332
)

blockchain/transactions/commit.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def process_transaction(self, call_from):
2929
fee = get_commit_and_reveal_fee_for_data_request(
3030
self.database,
3131
self.txn_details["data_request"],
32+
transaction_batch=self.transaction_batch,
3233
)
3334
if "fee" in fee:
3435
self.txn_details["fee"] = fee["fee"]
@@ -43,6 +44,7 @@ def process_transaction(self, call_from):
4344
collateral = get_collateral_for_data_request(
4445
self.database,
4546
self.txn_details["data_request"],
47+
transaction_batch=self.transaction_batch,
4648
)
4749
if "collateral" in collateral:
4850
self.txn_details["collateral"] = collateral["collateral"]

blockchain/transactions/data_request.py

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -320,46 +320,62 @@ def calculate_fees(
320320
return dro_fee, miner_fee
321321

322322

323-
def get_commit_and_reveal_fee_for_data_request(database, data_request_hash):
324-
sql = """
325-
SELECT
326-
data_request_txns.commit_and_reveal_fee
327-
FROM
328-
data_request_txns
329-
WHERE
330-
data_request_txns.txn_hash=%s
331-
LIMIT 1
332-
"""
333-
result = database.sql_return_one(
334-
sql,
335-
parameters=[bytearray.fromhex(data_request_hash)],
336-
)
337-
338-
if result:
339-
return {"fee": result[0]}
323+
def get_commit_and_reveal_fee_for_data_request(
324+
database,
325+
data_request_hash,
326+
transaction_batch=None,
327+
):
328+
if transaction_batch is not None and data_request_hash in transaction_batch:
329+
data_request_txn = transaction_batch[data_request_hash]
330+
return {"fee": data_request_txn["commit_and_reveal_fee"]}
340331
else:
341-
return {"error": "transaction not found"}
342-
343-
344-
def get_collateral_for_data_request(database, data_request_hash):
345-
sql = """
346-
SELECT
347-
data_request_txns.collateral
348-
FROM
349-
data_request_txns
350-
WHERE
351-
data_request_txns.txn_hash=%s
352-
LIMIT 1
353-
"""
354-
result = database.sql_return_one(
355-
sql,
356-
parameters=[bytearray.fromhex(data_request_hash)],
357-
)
358-
359-
if result:
360-
return {"collateral": result[0]}
332+
sql = """
333+
SELECT
334+
data_request_txns.commit_and_reveal_fee
335+
FROM
336+
data_request_txns
337+
WHERE
338+
data_request_txns.txn_hash=%s
339+
LIMIT 1
340+
"""
341+
result = database.sql_return_one(
342+
sql,
343+
parameters=[bytearray.fromhex(data_request_hash)],
344+
)
345+
346+
if result:
347+
return {"fee": result[0]}
348+
else:
349+
return {"error": "transaction not found"}
350+
351+
352+
def get_collateral_for_data_request(
353+
database,
354+
data_request_hash,
355+
transaction_batch=None,
356+
):
357+
if transaction_batch is not None and data_request_hash in transaction_batch:
358+
data_request_txn = transaction_batch[data_request_hash]
359+
return {"collateral": data_request_txn["collateral"]}
361360
else:
362-
return {"error": "transaction not found"}
361+
sql = """
362+
SELECT
363+
data_request_txns.collateral
364+
FROM
365+
data_request_txns
366+
WHERE
367+
data_request_txns.txn_hash=%s
368+
LIMIT 1
369+
"""
370+
result = database.sql_return_one(
371+
sql,
372+
parameters=[bytearray.fromhex(data_request_hash)],
373+
)
374+
375+
if result:
376+
return {"collateral": result[0]}
377+
else:
378+
return {"error": "transaction not found"}
363379

364380

365381
def build_retrieval(kinds, urls, all_headers, bodies, scripts):

blockchain/transactions/reveal.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def process_transaction(self, call_from):
3333
fee = get_commit_and_reveal_fee_for_data_request(
3434
self.database,
3535
self.txn_details["data_request"],
36+
transaction_batch=self.transaction_batch,
3637
)
3738
if "fee" in fee:
3839
self.txn_details["fee"] = fee["fee"]

0 commit comments

Comments
 (0)