1 |
# -*- coding: utf-8 -*- |
2 |
from __future__ import division, print_function, unicode_literals |
3 |
|
4 |
import struct |
5 |
import operator |
6 |
import unittest |
7 |
from functools import reduce |
8 |
|
9 |
import testsupport as ts |
10 |
from pyradlib import lcompare |
11 |
from pyradlib.pyrad_proc import PIPE, Error, ProcMixin |
12 |
|
13 |
|
14 |
class PyradprocTestCase(unittest.TestCase, ProcMixin): |
15 |
|
16 |
def _runit(self, cmdl, actstrl, _in=None, indata=None, nl=False): |
17 |
try: |
18 |
procs = [] |
19 |
if isinstance(cmdl[0], (type(b''), type(u''))): |
20 |
proc = self.call_one(cmdl, actstrl, _in=_in, out=PIPE, |
21 |
universal_newlines=nl) |
22 |
procs = [proc] |
23 |
elif len(cmdl) == 2: |
24 |
procs = self.call_two(cmdl[0], cmdl[1], actstrl[0], actstrl[1], |
25 |
_in=_in, out=PIPE, universal_newlines=nl) |
26 |
else: |
27 |
procs = self.call_many(cmdl, actstrl, _in=_in, out=PIPE, |
28 |
universal_newlines=nl) |
29 |
if indata: # only for small amounts of data |
30 |
procs[0].stdin.write(indata) |
31 |
procs[0].stdin.close() |
32 |
res = procs[-1].stdout.read() |
33 |
else: |
34 |
res, eres = procs[-1].communicate() |
35 |
if eres: print('Errors', eres) # XXX |
36 |
except Error as e: |
37 |
raise |
38 |
self.fail('%s [%s]' % (str(e), self.qjoin(cmdl))) |
39 |
finally: |
40 |
for proc in procs: |
41 |
proc.wait() |
42 |
if nl: return filter(None, lcompare.split_rad(res)) |
43 |
return res |
44 |
|
45 |
|
46 |
def test_call_1_text(self): |
47 |
for n in (5, 10, 55, 200,1328,1329,1330,1331,1332, 2000, 5000,99999): |
48 |
exp = range(n) |
49 |
cntcmd = ['cnt', str(n)] |
50 |
res = self._runit(cntcmd, 'run one process', nl=True) |
51 |
resl = [r[0] for r in res] |
52 |
try: lcompare.lcompare(resl, exp) |
53 |
except lcompare.error as e: |
54 |
print(resl, exp) |
55 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
56 |
|
57 |
def test_call_1_bin(self): |
58 |
exp = (2160, 8640, 4320, 1080, 7560) |
59 |
cntcmd = ['total', '-of'] |
60 |
tfn = ts.datafile('histo.dat') |
61 |
res = self._runit(cntcmd, 'run one process', _in=tfn, nl=False) |
62 |
resl = struct.unpack('f'*len(exp), res) |
63 |
try: lcompare.lcompare(resl, exp) |
64 |
except lcompare.error as e: |
65 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
66 |
|
67 |
def test_call_1_noexe(self): |
68 |
# let's hope a program with that name doesn't exist |
69 |
cmd = ['iswaxubdzkcgrhmltqvyfepojn', '-x', '-y', '-z'] |
70 |
self.assertRaises(Error, self._runit, cmd, 'run one process') |
71 |
|
72 |
def test_call_2_text(self): |
73 |
# with values higher than 44721, total will return a float in e-format. |
74 |
for n in (5, 10, 55, 200,1328,1329,1330,1331,1332, 2000, 5000, |
75 |
44721, 44722, 99999): |
76 |
exp = [reduce(operator.add, range(n))] |
77 |
cntcmd = ['cnt', str(n)] |
78 |
totcmd = ['total'] |
79 |
resl = self._runit([cntcmd, totcmd], ['run p1','run p2'], nl=True) |
80 |
try: lcompare.lcompare(resl, [exp]) |
81 |
except lcompare.error as e: |
82 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
83 |
|
84 |
def test_call_2_bin(self): |
85 |
for nn in ((1,1,1), (5,5,5), (3,9,5,2,8)): |
86 |
totalrows = reduce(operator.mul, nn) |
87 |
exp = [reduce(operator.add, range(n))*(totalrows/n) for n in nn] |
88 |
cntcmd = ['cnt'] + [str(n) for n in nn] |
89 |
totcmd = ['total', '-of'] |
90 |
res = self._runit([cntcmd, totcmd], ['run p1','run p2']) |
91 |
resl = struct.unpack('f'*len(exp), res) |
92 |
try: lcompare.lcompare(resl, exp) |
93 |
except lcompare.error as e: |
94 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
95 |
|
96 |
def test_call_many_text(self): |
97 |
data = b'''void polygon test 0 0 9 0 0 0 1 0 0 0 1 0''' |
98 |
exp = (['#', 'xform', '-s', 0.5], |
99 |
['#', 'xform', '-t', 0, 0, 2], |
100 |
['#', 'xform', '-ry', -20], |
101 |
['#', 'xform', '-rx', 30], |
102 |
['void', 'polygon', 'test'], |
103 |
[0], [0], [9], |
104 |
[0, 0, 1], |
105 |
[0.469846310393, 0, 1.17101007167], |
106 |
[-0.0855050358315, 0.433012701892, 1.23492315519]) |
107 |
cmd0 = ['xform', '-rx', '30'] |
108 |
cmd1 = ['xform', '-ry', '-20'] |
109 |
cmd2 = ['xform', '-t', '0', '0', '2'] |
110 |
cmd3 = ['xform', '-s', '0.5'] |
111 |
cmdl = [cmd0, cmd1, cmd2, cmd3] |
112 |
resl = self._runit(cmdl, 'run many processes', indata=data, |
113 |
_in=PIPE, nl=True) |
114 |
try: lcompare.lcompare(resl, exp) |
115 |
except lcompare.error as e: |
116 |
print(resl, exp) |
117 |
self.fail(('call_one_text n=%d -- ' % n) +str(e)) |
118 |
|
119 |
def test_call_many_bin(self): |
120 |
exp = (15, 40, 45, 48, 0) |
121 |
cmd0 = ['cnt', '3', '4', '5', '1'] |
122 |
cmd1 = ['histo', '1', '5'] |
123 |
cmd2 = ['total', '-of'] |
124 |
cmdl = [cmd0, cmd1, cmd2] |
125 |
res = self._runit(cmdl, 'run many processes') |
126 |
resl = struct.unpack('f'*len(exp), res) |
127 |
try: lcompare.lcompare(resl, exp) |
128 |
except lcompare.error as e: |
129 |
print(resl, exp) |
130 |
self.fail(('call_one_text -- ') +str(e)) |
131 |
|
132 |
|
133 |
# vi: set ts=4 sw=4 : |