| 1 |
# -*- coding: utf-8 -*- |
| 2 |
from __future__ import division, print_function, unicode_literals |
| 3 |
|
| 4 |
import struct |
| 5 |
import operator |
| 6 |
import unittest |
| 7 |
from functools import reduce |
| 8 |
|
| 9 |
import testsupport as ts |
| 10 |
from pyradlib import lcompare |
| 11 |
from pyradlib.pyrad_proc import PIPE, Error, ProcMixin |
| 12 |
|
| 13 |
|
| 14 |
class PyradprocTestCase(unittest.TestCase, ProcMixin): |
| 15 |
|
| 16 |
def _runit(self, cmdl, actstrl, _in=None, indata=None, nl=False): |
| 17 |
try: |
| 18 |
procs = [] |
| 19 |
if isinstance(cmdl[0], (type(b''), type(u''))): |
| 20 |
proc = self.call_one(cmdl, actstrl, _in=_in, out=PIPE, |
| 21 |
universal_newlines=nl) |
| 22 |
procs = [proc] |
| 23 |
elif len(cmdl) == 2: |
| 24 |
procs = self.call_two(cmdl[0], cmdl[1], actstrl[0], actstrl[1], |
| 25 |
_in=_in, out=PIPE, universal_newlines=nl) |
| 26 |
else: |
| 27 |
procs = self.call_many(cmdl, actstrl, _in=_in, out=PIPE, |
| 28 |
universal_newlines=nl) |
| 29 |
if indata: # only for small amounts of data |
| 30 |
procs[0].stdin.write(indata) |
| 31 |
procs[0].stdin.close() |
| 32 |
res = procs[-1].stdout.read() |
| 33 |
else: |
| 34 |
res, eres = procs[-1].communicate() |
| 35 |
if eres: print('Errors', eres) # XXX |
| 36 |
except Error as e: |
| 37 |
raise |
| 38 |
self.fail('%s [%s]' % (str(e), self.qjoin(cmdl))) |
| 39 |
finally: |
| 40 |
for proc in procs: |
| 41 |
proc.wait() |
| 42 |
if nl: return filter(None, lcompare.split_rad(res)) |
| 43 |
return res |
| 44 |
|
| 45 |
|
| 46 |
def test_call_1_text(self): |
| 47 |
for n in (5, 10, 55, 200,1328,1329,1330,1331,1332, 2000, 5000,99999): |
| 48 |
exp = range(n) |
| 49 |
cntcmd = ['cnt', str(n)] |
| 50 |
res = self._runit(cntcmd, 'run one process', nl=True) |
| 51 |
resl = [r[0] for r in res] |
| 52 |
try: lcompare.lcompare(resl, exp) |
| 53 |
except lcompare.error as e: |
| 54 |
print(resl, exp) |
| 55 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
| 56 |
|
| 57 |
def test_call_1_bin(self): |
| 58 |
exp = (2160, 8640, 4320, 1080, 7560) |
| 59 |
cntcmd = ['total', '-of'] |
| 60 |
tfn = ts.datafile('histo.dat') |
| 61 |
res = self._runit(cntcmd, 'run one process', _in=tfn, nl=False) |
| 62 |
resl = struct.unpack('f'*len(exp), res) |
| 63 |
try: lcompare.lcompare(resl, exp) |
| 64 |
except lcompare.error as e: |
| 65 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
| 66 |
|
| 67 |
def test_call_1_noexe(self): |
| 68 |
# let's hope a program with that name doesn't exist |
| 69 |
cmd = ['iswaxubdzkcgrhmltqvyfepojn', '-x', '-y', '-z'] |
| 70 |
self.assertRaises(Error, self._runit, cmd, 'run one process') |
| 71 |
|
| 72 |
def test_call_2_text(self): |
| 73 |
# with values higher than 44721, total will return a float in e-format. |
| 74 |
for n in (5, 10, 55, 200,1328,1329,1330,1331,1332, 2000, 5000, |
| 75 |
44721, 44722, 99999): |
| 76 |
exp = [reduce(operator.add, range(n))] |
| 77 |
cntcmd = ['cnt', str(n)] |
| 78 |
totcmd = ['total'] |
| 79 |
resl = self._runit([cntcmd, totcmd], ['run p1','run p2'], nl=True) |
| 80 |
try: lcompare.lcompare(resl, [exp]) |
| 81 |
except lcompare.error as e: |
| 82 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
| 83 |
|
| 84 |
def test_call_2_bin(self): |
| 85 |
for nn in ((1,1,1), (5,5,5), (3,9,5,2,8)): |
| 86 |
totalrows = reduce(operator.mul, nn) |
| 87 |
exp = [reduce(operator.add, range(n))*(totalrows/n) for n in nn] |
| 88 |
cntcmd = ['cnt'] + [str(n) for n in nn] |
| 89 |
totcmd = ['total', '-of'] |
| 90 |
res = self._runit([cntcmd, totcmd], ['run p1','run p2']) |
| 91 |
resl = struct.unpack('f'*len(exp), res) |
| 92 |
try: lcompare.lcompare(resl, exp) |
| 93 |
except lcompare.error as e: |
| 94 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
| 95 |
|
| 96 |
def test_call_many_text(self): |
| 97 |
data = b'''void polygon test 0 0 9 0 0 0 1 0 0 0 1 0''' |
| 98 |
exp = (['#', 'xform', '-s', 0.5], |
| 99 |
['#', 'xform', '-t', 0, 0, 2], |
| 100 |
['#', 'xform', '-ry', -20], |
| 101 |
['#', 'xform', '-rx', 30], |
| 102 |
['void', 'polygon', 'test'], |
| 103 |
[0], [0], [9], |
| 104 |
[0, 0, 1], |
| 105 |
[0.469846310393, 0, 1.17101007167], |
| 106 |
[-0.0855050358315, 0.433012701892, 1.23492315519]) |
| 107 |
cmd0 = ['xform', '-rx', '30'] |
| 108 |
cmd1 = ['xform', '-ry', '-20'] |
| 109 |
cmd2 = ['xform', '-t', '0', '0', '2'] |
| 110 |
cmd3 = ['xform', '-s', '0.5'] |
| 111 |
cmdl = [cmd0, cmd1, cmd2, cmd3] |
| 112 |
resl = self._runit(cmdl, 'run many processes', indata=data, |
| 113 |
_in=PIPE, nl=True) |
| 114 |
try: lcompare.lcompare(resl, exp) |
| 115 |
except lcompare.error as e: |
| 116 |
print(resl, exp) |
| 117 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
| 118 |
|
| 119 |
def test_call_many_bin(self): |
| 120 |
exp = (15, 40, 45, 48, 0) |
| 121 |
cmd0 = ['cnt', '3', '4', '5', '1'] |
| 122 |
cmd1 = ['histo', '1', '5'] |
| 123 |
cmd2 = ['total', '-of'] |
| 124 |
cmdl = [cmd0, cmd1, cmd2] |
| 125 |
res = self._runit(cmdl, 'run many processes') |
| 126 |
resl = struct.unpack('f'*len(exp), res) |
| 127 |
try: lcompare.lcompare(resl, exp) |
| 128 |
except lcompare.error as e: |
| 129 |
print(resl, exp) |
| 130 |
self.fail(('call_one_text -- ') +str(e)) |
| 131 |
|
| 132 |
|
| 133 |
# vi: set ts=4 sw=4 : |