Browse Source

add a class for hashing pieces in order...

This will be used to allow parallel processing of torrent pieces..

each piece of the torrent can be processed in parallel, and this
class will make sure that when processing the hash of a file in
the torrent, it will be hashed in the correct order...
main
John-Mark Gurney 1 year ago
parent
commit
5ee796735b
1 changed files with 104 additions and 12 deletions
  1. +104
    -12
      ui/medashare/btv/__init__.py

+ 104
- 12
ui/medashare/btv/__init__.py View File

@@ -7,6 +7,7 @@ import importlib.resources
import itertools
import os
import pathlib
import random
import shutil
import sys
import tempfile
@@ -26,6 +27,58 @@ def roundup(x, y):

return (x + y - 1) // y

class HashInOrder:
'''Takes a hashlib style object. Required to implement the update
and digest methods.

Use the submit method to submit blocks of data. It does not matter
the order that data is submitted, it will be hashed in the correct
order.'''

def __init__(self, hashobj):
self._hobj = hashobj
self._curpos = 0
self._pending = {}

@property
def pos(self):
'''Current position of the hash.

Note: this does not include pieces that have been submitted,
but not been hashed. This is the number of bytes hashes if
the digest method returns a value, i.e. all pending pieces
have been hashed.
'''

return self._curpos

def submit(self, off, data):
'''Submit data to be hashed that resides at offset off.
Offset starts at 0 and increments per the length of data
submitted.'''

if off in self._pending:
raise RuntimeError('offset %d already submitted' % off)

if off < self._curpos:
raise RuntimeError('offset %d previously submitted' % off)

self._pending[off] = data

pos = self._curpos
while pos in self._pending:
data = self._pending.pop(pos)
self._hobj.update(data)
pos += len(data)

self._curpos = pos

def digest(self):
if self._pending:
raise RuntimeError('data previously submitted has not been hashed')

return self._hobj.digest()

class Storage:
'''A class to help read pieces of a torrent.
'''
@@ -197,15 +250,12 @@ def validate(torrent, basedir, with_file_hashes=None):
def apply_fun(data, fname, offset):
if with_file_hashes is not None:
try:
hashobj, curoff = file_hashes[fname]
hio = file_hashes[fname]
except KeyError:
hashobj, curoff = with_file_hashes(), 0
file_hashes[fname] = hashobj, curoff
hio = HashInOrder(with_file_hashes())
file_hashes[fname] = hio

if curoff == offset:
hashobj.update(data)
file_hashes[fname] = (hashobj, offset +
len(data))
hio.submit(offset, data)

hash.update(data)

@@ -238,9 +288,9 @@ def validate(torrent, basedir, with_file_hashes=None):
f, e = e, f

if with_file_hashes:
file_hashes = { torrentdir: hashobj.digest() for fname, (hashobj,
off) in file_hashes.items() if info['length'] == off and
torrentdir in f }
file_hashes = { torrentdir: hio.digest() for fname, hio
in file_hashes.items() if info['length'] ==
hio.pos and torrentdir in f }
return f, e, file_hashes

return f, e
@@ -253,8 +303,8 @@ def validate(torrent, basedir, with_file_hashes=None):

r = (allfiles - badfiles, badfiles,)

file_hashes = { torrentdir / fname: hashobj.digest() for fname, (hashobj,
off) in file_hashes.items() if filesizes[fname] == off and
file_hashes = { torrentdir / fname: hio.digest() for fname, hio in
file_hashes.items() if filesizes[fname] == hio.pos and
(torrentdir / fname) in r[0] }

if with_file_hashes is not None:
@@ -311,6 +361,48 @@ class _TestCases(unittest.TestCase):
with open(k, 'wb') as fp:
fp.write(v)

def test_hashinorder(self):
hashobj = sha512

testdata = random.randbytes(500)

dgst = hashobj(testdata).digest()

hio = HashInOrder(hashobj())

blksiz = 15

blks = [ (x, testdata[x: x + blksiz]) for x in range(0, len(testdata), blksiz) ]

idx, data = blks.pop(-1)

# add the last bit of data
hio.submit(idx, data)

# raise an exception if submitting the same blk again
with self.assertRaises(RuntimeError):
hio.submit(idx, data)

# make sure that if we ask for a digest w/ data we haven't
# yet processed, we raise an exception
with self.assertRaises(RuntimeError):
hio.digest()

# shuffle the data
random.shuffle(blks)

# submit suffled data
for idx, data in blks:
hio.submit(idx, data)

# raise an exception if submitting an earlier blk
with self.assertRaises(RuntimeError):
hio.submit(0, 'dflkj')

self.assertEqual(len(testdata), hio.pos)

self.assertEqual(dgst, hio.digest())

def test_completeverif(self):
tf = self.basetempdir / 'a.torrent'
with open(tf, 'wb') as fp:


Loading…
Cancel
Save