Dask Bag.to_textfiles एकल विभाजन के साथ काम करता है लेकिन एकाधिक - पायथन, डस्क नहीं

बहुत संक्षेप में .. क्या यह एक बग है या क्या मुझे कुछ याद आ रही है? tmp_j एक बैग और 6 विभाजन के साथ एक बैग है। हालांकि, मुझे मिलता है बड़े बैग के साथ समान प्रतिक्रियाएं।

इस विशेष बैग के साथ बनाया गया था:

>>> tmp_j = jnode_b.filter(lambda r: (r["node"]["attrib"]["uid"] == "8909") &
(r["node"]["attrib"]["version"] == "1")).pluck("node").pluck("attire")

और ऐसा लगता है:

>>> tmp_j.compute()

[{"changeset": "39455176",
"id": "4197394169",
"lat": "53.4803608",
"lon": "-113.4955328",
"timestamp": "2016-05-20T16:43:02Z",
"uid": "8909",
"user": "mvexel",
"version": "1"}]

एक बार फिर धन्यवाद..

>>> tmp_j.repartition(1).map(json.dumps).to_textfiles("tmpA*.json")

सही ढंग से काम करता है, (फ़ाइल लिखता है), लेकिन

>>> tmp_j.map(json.dumps).to_textfiles("tmpA*.json")

देता है

StopIteration                             Traceback (most recent call last)
<ipython-input-28-a77a33e2ff26> in <module>()
----> 1 tmp_j.map(json.dumps).to_textfiles("tmp*.json")

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute)
469     def to_textfiles(self, path, name_function=str, compression="infer",
470                      encoding=system_encoding, compute=True):
--> 471         return to_textfiles(self, path, name_function, compression, encoding, compute)
472
473     def fold(self, binop, combine=None, initial=no_default, split_every=None):

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute)
167     result = Bag(merge(b.dask, dsk), name, b.npartitions)
168     if compute:
--> 169         result.compute()
170     else:
171         return result

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36     def compute(self, **kwargs):
---> 37         return compute(self, **kwargs)[0]
38
39     @classmethod

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108                 for opt, val in groups.items()])
109     keys = [var._keys() for var in variables]
--> 110     results = get(dsk, keys, **kwargs)
111
112     results_iter = iter(results)

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs)
76         # Run
77         result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 78                            queue=queue, get_id=_process_get_id, **kwargs)
79     finally:
80         if cleanup:

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
486                 _execute_task(task, data)  # Re-execute locally
487             else:
--> 488                 raise(remote_exception(res, tb))
489         state["cache"][key] = res
490         finish_task(dsk, key, state, results, keyorder.get)

StopIteration:

Traceback
---------
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write
firstline = next(data)

नोट: वह है

>>> tmp_b = db.from_sequence(tmp_j,partition_size=3)
>>> tmp_b.map(json.dumps).to_textfiles("tmp*.json")

ठीक काम करता है (लेकिन फिर से, tmp_b.npartitions == 1)।

अंतर्दृष्टि के लिए फिर से धन्यवाद - मैंने स्रोत को देखा लेकिन फिर महसूस किया कि मेरा स्मार्ट / आलसी अनुपात बहुत कम था।

मैं डॉक्स जमा करूंगा जब मुझे "आत्मविश्वास है" मुझे इस पर पकड़ मिली है।

उत्तर:

उत्तर № 1 के लिए 1

यह एक वास्तविक बग था और अब मास्टर में हल हो गया है

In [1]: import dask.bag as db

In [2]: db.range(5, npartitions=5).filter(lambda x: x == 1).map(str).to_textfiles("*.txt")

In [3]: ls *.txt
0.txt  1.txt  2.txt  3.txt  4.txt  C:nppdf32Logdebuglog.txt  foo.txt

संबंधित सवाल
सबसे लोकप्रिय