# First experience with distributed computation on arrays

In [17]:
using Distributed

addprocs(4)

4-element Vector{Int64}:
 6
 7
 8
 9

In [18]:
@everywhere function maxnum_serial(a,s,e)
                if s==e
                  a[s]
         else
                   mid = div((s+e),2)
                   low = maxnum_serial(a,s,mid)
                   high = maxnum_serial(a,mid+1,e)
                   low >high ? low : high
                end
       end

In [19]:
@everywhere function maxnum_parallel(a,s,e)
                if (e-s)<=10000000
                  maxnum_serial(a,s,e)
               else
                   mid = div((s+e),2)
                   low_remote = @spawn maxnum_parallel(a,s,mid)
                   high = maxnum_parallel(a,mid+1,e)
                   low = fetch(low_remote)
                   low > high ? low : high
                end
       end

In [20]:
n=1024;
a=rand(n);

In [21]:
maxnum_serial(a,1,n)

0.9953089443669203

In [22]:
n = 20000000
a=rand(n);

In [23]:
@time maxnum_serial(a,1,n)

  0.095465 seconds (1 allocation: 16 bytes)


0.9999999719449176

As we can see, the parallel version runs slower than its serial counterpart.

In [24]:
@time maxnum_parallel(a,1,n)

  0.635260 seconds (51.28 k allocations: 2.714 MiB, 2.37% compilation time)


0.9999999719449176

Indeed, the amount of work (number of comparisons) is in the same order of magnitude of data transfer (number of integers to move from one processor than another). But the latter costs much more clock-cycles.

In [25]:
@everywhere function minimum_maximum_serial(a,s,e)
                if s==e
                  [a[s], a[s]]
         else
                   mid = div((s+e),2)
                   X = minimum_maximum_serial(a,s,mid)
                   Y = minimum_maximum_serial(a,mid+1,e)
                   [min(X[1],Y[1]), max(X[2],Y[2])]
                end
       end

In [26]:
@everywhere function minimum_maximum_parallel(a,s,e)
                if (e-s)<=10000000
                  minimum_maximum_serial(a,s,e)
               else
                   mid = div((s+e),2)
                   R = @spawn minimum_maximum_parallel(a,s,mid)
                   Y = minimum_maximum_parallel(a,mid+1,e)
                   X = fetch(R)
                   [min(X[1],Y[1]), max(X[2],Y[2])]
                end
       end

In [27]:
n = 20000000
a=rand(n);

In [28]:
@time minimum_maximum_serial(a,1,n)

  1.885921 seconds (40.01 M allocations: 2.981 GiB, 18.28% gc time, 0.67% compilation time)


2-element Vector{Float64}:
 3.3229708207294095e-8
 0.9999999533005469

As we can see below, the parallel version is faster now than its serial counterpart. Should we be satidified with this result?

In [29]:
@time minimum_maximum_parallel(a,1,n)

  1.504330 seconds (20.05 M allocations: 1.493 GiB, 7.34% gc time, 0.91% compilation time)


2-element Vector{Float64}:
 3.3229708207294095e-8
 0.9999999533005469

No, since computing serially the minimum alone is about 10 times faster

# First experience with parallel mergesort

We start with an in-place versio of mergesort

In [30]:
function mergesort(data, istart, iend)
                      if(istart < iend)
                              mid = (istart + iend) >>>1
                              mergesort(data, istart, mid)
                              mergesort(data, mid+1, iend)
                                 merge(data, istart, mid, iend)
                      end
              end

mergesort (generic function with 1 method)

In [31]:
function merge( data, istart, mid, iend)
                      n = iend - istart + 1
                      temp = zeros(n)
                      s = istart
                      m = mid+1
                      for tem = 1:n
                              if s <= mid && (m > iend || data[s] <= data[m])
                                      temp[tem] = data[s]
                                      s=s+1
                              else
                                      temp[tem] = data[m]
                                      m=m+1
                              end
                      end
                      data[istart:iend] = temp[1:n]
              end

merge (generic function with 1 method)

In [32]:
n = 1000000
A = [rem(rand(Int32),10) for i =1:n];
@time mergesort(A, 1, n);

  0.280544 seconds (2.04 M allocations: 427.254 MiB, 10.86% gc time, 10.41% compilation time)


Towards parallel code and pretending that we do not know about SharedArrays, we write an uut-of-place serial merge sort

In [33]:
function mergesort(data, istart, iend)
                      if(istart < iend)
                              mid = div((istart + iend),2)
                              a = mergesort(data, istart, mid)
                              b = mergesort(data,mid+1, iend)
                              c = merge(a, b, istart, mid, iend)
                      else
                          [data[istart]]
                      end
              end              

mergesort (generic function with 1 method)

In [34]:
@everywhere function merge(a, b, istart, mid, iend)
                      n = iend - istart + 1
                      nb = iend - mid
                      na = mid - istart + 1
                      c = zeros(n)
                      s = 1
                      m = 1
                      for tem = 1:n
                              if s <= na && (m > nb || a[s] <= b[m])
                                      c[tem] = a[s]
                                      s= s+1
                              else
                                      c[tem] = b[m]
                                      m=m+1
                              end
                      end
                      c
              end


In [35]:
n = 1000000;
A = [rem(rand(Int32),10) for i =1:n];
@time mergesort(A, 1, n);

  0.241117 seconds (2.06 M allocations: 276.735 MiB, 7.09% gc time, 28.88% compilation time)


In [36]:
@everywhere function mergesort_serial(data, istart, iend)
                      if(istart < iend)
                              mid = div((istart + iend),2)
                              a = mergesort_serial(data, istart, mid)
                              b = mergesort_serial(data,mid+1, iend)
                              c = merge(a, b, istart, mid, iend)
                      else
                          [data[istart]]
                      end
              end

In [37]:
@everywhere function mergesort_parallel(data, istart, iend)
                      if(iend - istart <= 2500000)
                              mergesort_serial(data, istart, iend)
                       else
                              mid = div((istart + iend),2)
                              a = @spawn mergesort_parallel(data, istart, mid)
                              b = mergesort_parallel(data,mid+1, iend)
                              c = merge(fetch(a), b, istart, mid, iend)
                      end
              end

In [38]:
n = 10000000;
A = [rem(rand(Int32),10) for i =1:n];
@time mergesort_serial(A, 1, n);

  2.031609 seconds (20.01 M allocations: 2.909 GiB, 13.08% gc time, 0.39% compilation time)


In [39]:
@time mergesort_parallel(A, 1, n);

  1.727158 seconds (5.08 M allocations: 884.767 MiB, 5.29% gc time, 0.87% compilation time)


So we got some speedup! But not that great!
EXERCISE: Improve the performance of he above code
HINT: Use SharedArrays

# Matrix Matrix Multiplication

![Blocked matrix multiplication](4-29.png)


In [40]:
function dacmm(i0, i1, j0, j1, k0, k1, A, B, C, n, basecase)
           ## A, B, C are matrices
           ## We compute C = A * B

           if n > basecase
              n = div(n, 2)
              dacmm(i0, i1, j0, j1, k0, k1, A, B, C, n, basecase)
              dacmm(i0, i1, j0, j1+n, k0, k1+n, A, B, C, n, basecase)
              dacmm(i0+n, i1, j0, j1, k0+n, k1, A, B, C, n, basecase)
              dacmm(i0+n, i1, j0, j1+n, k0+n, k1+n, A, B, C, n, basecase)
              dacmm(i0, i1+n, j0+n, j1, k0, k1, A, B, C, n, basecase)
              dacmm(i0, i1+n, j0+n, j1+n, k0, k1+n, A, B, C, n, basecase)
              dacmm(i0+n, i1+n, j0+n, j1, k0+n, k1, A, B, C, n, basecase)
              dacmm(i0+n, i1+n, j0+n, j1+n, k0+n, k1+n, A, B, C, n, basecase)
           else
             for i= 1:n, j=1:n, k=1:n
                 C[i+k0,k1+j] = C[i+k0,k1+j] + A[i+i0,i1+k] * B[k+j0,j1+j]
             end
           end
       end

dacmm (generic function with 1 method)

In [41]:
n=4
basecase = 2
A = [rem(rand(Int32),5) for i =1:n, j = 1:n]
B = [rem(rand(Int32),5) for i =1:n, j = 1:n]
C = zeros(Int32,n,n);
dacmm(0, 0, 0, 0, 0, 0, A, B, C, n, basecase)
A * B - C

4×4 Matrix{Int64}:
 0  0  0  0
 0  0  0  0
 0  0  0  0
 0  0  0  0

In [42]:
function test_dacmm(n, basecase)
	 A = [rem(rand(Int32),5) for i =1:n, j = 1:n]
	 B = [rem(rand(Int32),5) for i =1:n, j = 1:n]
	 C = zeros(Int32,n,n);
	 @time dacmm(0, 0, 0, 0, 0, 0, A, B, C, n, basecase)
	 if n < 1024
	 return A * B - C
	 else
	 @assert A* B == C
	 end
end

test_dacmm (generic function with 1 method)

In [43]:
test_dacmm(4, 2)

  0.000001 seconds


4×4 Matrix{Int64}:
 0  0  0  0
 0  0  0  0
 0  0  0  0
 0  0  0  0

In [44]:
test_dacmm(16, 4)

  0.000009 seconds


16×16 Matrix{Int64}:
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0

In [45]:
test_dacmm(256, 16)

  0.026412 seconds


256×256 Matrix{Int64}:
 0  0  0  0  0  0  0  0  0  0  0  0  0  …  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  …  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  …  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0   

In [46]:
test_dacmm(1024, 2)

  4.285429 seconds


In [47]:
[test_dacmm(1024,2^i) for i=1:10]

  4.193990 seconds
  2.410697 seconds
  1.822969 seconds
  1.612543 seconds
  1.517893 seconds
  2.063892 seconds
  2.211178 seconds
  2.336252 seconds
  2.306678 seconds
  2.184524 seconds


10-element Vector{Nothing}:
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing

In [48]:
@everywhere function dacmm_parallel(i0, i1, j0, j1, k0, k1, A, B, C, n, basecase)
   l = []
   if n > basecase && nprocs() > 3
    n = div(n,2)
    lrf = [
       @spawnat procs()[1] dacmm_parallel(i0, i1, j0, j1, k0, k1, A, B, C, n,basecase),
       @spawnat procs()[2] dacmm_parallel(i0, i1, j0, j1+n, k0, k1+n, A, B, C, n,basecase),
       @spawnat procs()[3] dacmm_parallel(i0+n, i1, j0, j1, k0+n, k1, A, B, C, n,basecase),
       @spawnat procs()[4] dacmm_parallel(i0+n, i1, j0, j1+n, k0+n, k1+n, A, B, C, n, basecase)
       ]
    ## pmap(fetch, lrf)
    l = [l..., map(fetch, lrf)]
    lrf = [
       @spawnat procs()[1] dacmm_parallel(i0, i1+n, j0+n, j1, k0, k1, A, B, C, n,basecase),
       @spawnat procs()[2] dacmm_parallel(i0, i1+n, j0+n, j1+n, k0, k1+n, A, B, C, n, basecase),
       @spawnat procs()[3] dacmm_parallel(i0+n, i1+n, j0+n, j1, k0+n, k1, A, B, C, n, basecase),
       @spawnat procs()[4] dacmm_parallel(i0+n, i1+n, j0+n, j1+n, k0+n, k1+n, A, B, C, n, basecase)
       ]
    ## pmap(fetch, lrf)
    l = [l..., map(fetch, lrf)]
   else
    for i= 1:n, j=1:n, k=1:n
        C[i+k0,k1+j] += A[i+i0,i1+k] * B[k+j0,j1+j]
    end
   end
   return l
end

In [49]:
using SharedArrays

@everywhere n = 4
a = SharedArray{Int32}(n,n)
for i = 1:n, j = 1:n a[i,j]=rem(rand(Int64),5) end
b = SharedArray{Int32}(n,n)
for i = 1:n, j = 1:n b[i,j]=rem(rand(Int64),5) end
c = SharedArray{Int32}(n,n)
dacmm_parallel(0,0,0,0,0,0,a,b,c,n,n)
c - a * b

4×4 Matrix{Int32}:
 0  0  0  0
 0  0  0  0
 0  0  0  0
 0  0  0  0

In [50]:
c = SharedArray{Int32}(n,n)
dacmm_parallel(0,0,0,0,0,0,a,b,c,n,div(n,2))
c - a * b


4×4 Matrix{Int32}:
 0  0  -4  -3
 0  0  -9  -1
 3  7  -2   1
 6  2   5   5

In [51]:
function test_dacmm_parallel(n, basecase)
	 a = SharedArray{Int32}(n,n)
	 for i = 1:n, j = 1:n a[i,j]=rem(rand(Int32),5) end
	 b = SharedArray{Int32}(n,n)
	 for i = 1:n, j = 1:n b[i,j]=rem(rand(Int32),5) end
	 c = SharedArray{Int32}(n,n)
	 @time dacmm_parallel(0,0,0,0,0,0,a,b,c,n,basecase)
	 if n < 128
	    sleep(2)
	    return a * b - c
	 end
end

test_dacmm_parallel (generic function with 1 method)

In [52]:
@everywhere n = 16
test_dacmm_parallel(n,n)
test_dacmm_parallel(n,div(n,2))
test_dacmm_parallel(n,div(n,4))


  0.000022 seconds (1 allocation: 48 bytes)
  0.000854 seconds (389 allocations: 22.609 KiB)
  0.579902 seconds (1.37 M allocations: 73.912 MiB, 98.69% compilation time)


16×16 Matrix{Int32}:
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0

In [53]:
@everywhere n = 64
test_dacmm_parallel(n,n)
test_dacmm_parallel(n,div(n,2))
test_dacmm_parallel(n,div(n,4))
test_dacmm_parallel(n,div(n,8))

  0.000962 seconds (1 allocation: 48 bytes)
  0.002155 seconds (409 allocations: 23.750 KiB)
  0.004459 seconds (1.45 k allocations: 215.828 KiB)
  0.205489 seconds (511.46 k allocations: 27.604 MiB, 97.14% compilation time)


64×64 Matrix{Int32}:
 0  0  0  0  0  0  0  0  0  0  0  0  0  …  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  …  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0  …  0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     0  0  0  0  0  0  0  0  0  0  0  0
 0  0  0  0  0  0  0  0  0  0  0  0  0     

In [54]:
@everywhere n = 128
[test_dacmm_parallel(n,2^i) for i=2:6]

  0.408870 seconds (1.13 M allocations: 60.711 MiB, 5.48% gc time, 88.18% compilation time)
  0.002034 seconds (6.52 k allocations: 368.562 KiB)
  0.001030 seconds (2.86 k allocations: 160.641 KiB)
  0.000859 seconds (1.23 k allocations: 69.406 KiB)
  0.001995 seconds (411 allocations: 23.109 KiB)


5-element Vector{Nothing}:
 nothing
 nothing
 nothing
 nothing
 nothing

In [55]:
@everywhere n = 256
[test_dacmm_parallel(n,2^i) for i=3:7]

  0.004025 seconds (12.70 k allocations: 716.297 KiB)
  0.002095 seconds (6.14 k allocations: 345.844 KiB)
  0.001808 seconds (2.87 k allocations: 160.922 KiB)
  0.003933 seconds (1.24 k allocations: 69.297 KiB)
  0.014451 seconds (412 allocations: 24.172 KiB)


5-element Vector{Nothing}:
 nothing
 nothing
 nothing
 nothing
 nothing

In [56]:
@everywhere n = 1024
[test_dacmm_parallel(n,2^i) for i=4:10]

  0.099257 seconds (170.44 k allocations: 9.517 MiB, 88.49% compilation time)
  0.008844 seconds (13.11 k allocations: 736.344 KiB)
  0.021495 seconds (6.61 k allocations: 366.250 KiB)
  0.087802 seconds (3.38 k allocations: 185.094 KiB)
  0.354920 seconds (4.75 k allocations: 238.375 KiB)
  1.424552 seconds (7.66 k allocations: 377.484 KiB)
  5.834110 seconds (1 allocation: 48 bytes)


7-element Vector{Nothing}:
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing

In [57]:
[test_dacmm(n,2^i) for i=3:10]

  1.991816 seconds
  1.791570 seconds
  1.651079 seconds
  2.378351 seconds
  2.332869 seconds
  2.504715 seconds
  2.430819 seconds
  2.428149 seconds


8-element Vector{Nothing}:
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing