Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Julia calculates in parallel


May 14, 2021 Julia


Table of contents


Parallel calculations

Julia provides a messaging-based multiprocessor environment that allows programs to run programs simultaneously on multiple processors using separate memory space.

Julia's messaging is different from environments such as MPI. Communication in Julia is "unilateral", i.e. programmers only need to manage one processor in a dual processor operation.

Parallel programming in Julia is based on two originals: remote references and remote calls. R emote reference object, used to consult objects stored on a specified processor from any processor. R emote call requests that one processor call a function to another (and possibly the same) processor to handle certain parameters. R emote call returns the remote reference object. R emote call is returned immediately; Y ou can call wait on wait to wait for remote call to complete execution, and then fetch full value of the result. Use put store values to remote reference.

Starting julia -p n n processors can be provided n local machine. General n the number of CPU cores on the machine:

$ ./julia -p 2

julia> r = remotecall(2, rand, 2, 2)
RemoteRef(2,1,5)

julia> fetch(r)
2x2 Float64 Array:
 0.60401   0.501111
 0.174572  0.157411

julia> s = @spawnat 2 1 .+ fetch(r)
RemoteRef(2,1,7)

julia> fetch(s)
2x2 Float64 Array:
 1.60401  1.50111
 1.17457  1.15741

remote_call processor index value that is used to perform this operation. M ost parallel programming in Julia does not query the number of specific or available processors, but remote_call a low-level interface for fine control. T he second argument is the function to be called, and the remaining arguments are the parameters of the function. I n this example, let's have processor 2 construct a random matrix of 2x2, and then we add 1 to the result. T he results of the two calculations are saved in two remote references, r and s @spawnat macro evaluates the expression in the second argument on the processor indicated by the first argument.

remote_call_fetch can immediately get the value to be calculated at the far end. It is equivalent to fetch(remote_call(...)) but more efficient than that:

julia> remotecall_fetch(2, getindex, r, 1, 1)
0.10824216411304866

getindex(r,1,1) :ref: equivalent to 等价于 <man-array-indexing> r[1,1] first element of the remote reference r

remote_call is not very convenient. @spawn by using macros, which manipulate expressions rather than functions and automatically select where to evaluate them:

julia> r = @spawn rand(2,2)
RemoteRef(1,1,0)

julia> s = @spawn 1 .+ fetch(r)
RemoteRef(1,1,1)

julia> fetch(s)
1.10824216411304866 1.13798233877923116
1.12376292706355074 1.18750497916607167

Note that here you use 1 .+ fetch(r) 1 .+ r T his is because we don't know where the fetch and r it needs to the processor that adds it. In this example, @spawn smart enough to know that r processors with r fetch will do nothing.

(@spawn @spawn a built-in function, but a Julia definition: 宏 <man-macros>

All processors that execute program code must have access to the program code. F or example, enter:

julia> function rand2(dims...)
         return 2*rand(dims...)
       end

julia> rand2(2,2)
2x2 Float64 Array:
 0.153756  0.368514
 1.15119   0.918912

julia> @spawn rand2(2,2)
RemoteRef(1,1,1)

julia> @spawn rand2(2,2)
RemoteRef(2,1,2)

julia> exception on 2: in anonymous: rand2 not defined 

Process 1 knows rand2 but Process 2 does not. require function automatically loads the source file on all available processors, enabling all processors to run code:

julia> require("myfile")

In a cluster, the contents of the files (and any files recursively loaded) are sent to the entire network. Y ou @everywhere to execute commands on all processors:

julia> @everywhere id = myid()

julia> remotecall_fetch(2, ()->id)
2

@everywhere include("defs.jl")

Files can also be preloaded when multiple processes start, and a driver script can be used to drive calculations:

    julia -p <n> -L file1.jl -L file2.jl driver.jl

Each process has an associated identifier. T he Julia prompt provided by this process always has an id value of 1, just as in the example above, the julia process runs the driver script. T his process, which is defaulted to parallel operations, is workers W hen there is only one process, process 1 is used as a worker. Otherwise, worker refers to all processes except process 1.

Julia has built-in support for two clusters:

  • As shown above, a local cluster specifies the use of —p option.
  • A cluster generation machine uses the --machinefile option. It uses a passwordless ssh julia work process on the specified machine (with the same path as the current host).

Functions addprocs rmprocs workers of course there are other functions available in a cluster that can be added, removed, and queried in a programmable manner.

Other types of clusters can do this by writing their own custom ClusterManager. See the ClusterManagers section.

Data movement

Messaging and data movement are the biggest overheads in parallel computing. Reducing the number of both is critical to performance.

fetch an explicit data movement operation that directly requires the object to be moved to the current machine. @spawn (and related macros) also move data, but not explicitly, and are therefore referred to as implicit data movement operations. Compare the following two ways to construct a random matrix and calculate its squares:

    # method 1
    A = rand(1000,1000)
    Bref = @spawn A^2
    ...
    fetch(Bref)

    # method 2
    Bref = @spawn rand(1000,1000)^2
    ...
    fetch(Bref)

In Method 1, a random matrix is constructed locally and passed on to the processor that does the square calculation. I n Method 2, a random matrix is constructed and squared by the same processor. Therefore, Method 2 moves much less data than Method 1.

Parallel mapping and looping

Most parallel calculations do not require moving data. T he most common is Monte Carlo simulation. T he following example @spawn simulate coin tossing on both processors. S tart by count_heads.jl

    function count_heads(n)
        c::Int = 0
        for i=1:n
            c += randbool()
        end
        c
    end

Simulate on two machines and finally add up the results:

    require("count_heads")

    a = @spawn count_heads(100000000)
    b = @spawn count_heads(100000000)
    fetch(a)+fetch(b)

Iterate independently on multiple processors and then combine their results with some functions. The integrated process is called about simplicity.

In the example above, we explicitly called @spawn which limit parallel calculations to two processors. T o run on any number of processors, you should use a parallel for loop, which should be written in Julia as:

    nheads = @parallel (+) for i=1:200000000
      int(randbool())
    end

This construct implements a pattern of assigning iterations to multiple processors and uses a specific simplicity to synth the results (+)

Note that although parallel for loops look similar to a set of for loops, their behavior is very different. F irst, loops are not in order. S econd, the value written into a variable or array is not globally visible because the iteration runs on a different processor. All variables used in parallel loops are copied and broadcast to each processor.

The following code does not run as expected:

    a = zeros(100000)
    @parallel for i=1:100000
      a[i] = i
    end

If you don't need it, you can omit the approximate operator. H owever, this code does not a all elements of a because there is only one separate part on each processor. S imilar parallel for loops should be avoided. But we can use distributed arrays to circumvent this situation, which we'll talk about later.

If the "external" variable is read-only, you can use it in parallel loops:

    a = randn(1000)
    @parallel (+) for i=1:100000
      f(a[randi(end)])
    end

Sometimes we don't need to be simply, we just want to apply a function to an integer of a range (or an element of a collection). Y ou can then use the parallel mapping pmap function. I n the following example, the singular values of several large random matrices are calculated in parallel:

    M = {rand(1000,1000) for i=1:10}
    pmap(svd, M)

The called function uses pmap for a lot of work and, conversely, @parallel for .

Synchronize with remote references

Scheduling

Julia's parallel programming platform uses tasks (also known as co-programs) that can be switched across multiple calculations. W henever the code performs a communication operation, fetch wait the current task pauses while the scheduler selects another task to run. When the event waits to complete, the task restarts.

For many problems, there is no need to think directly about tasks. H owever, because dynamic scheduling is available, you can wait for multiple events at the same time. I n dynamic scheduling, a program decides what to calculate and where to calculate it, based on when other work is done. This is required for unpredictable or unbalanced work loads, and we can only assign more work processes when they finish their current tasks.

As an example, consider calculating the singular values of matrices of different sizes:

    M = {rand(800,800), rand(600,600), rand(800,800), rand(600,600)}
    pmap(svd, M)

If one process wants to process an 800 x 800 matrix and another 600 x 600 matrix, we won't get a lot of scalability. T he solution is to have local tasks "feed" work in each process as they complete their current tasks. You can see this during the implementation of pmap

    function pmap(f, lst)
        np = nprocs()  # determine the number of processes available
        n = length(lst)
        results = cell(n)
        i = 1
        # function to produce the next work item from the queue.
        # in this case it's just an index.
        nextidx() = (idx=i; i+=1; idx)
        @sync begin
            for p=1:np
                if p != myid() || np == 1 
                    @async begin
                        while true
                            idx = nextidx()
                            if idx > n
                                break
                            end
                            results[idx] = remotecall_fetch(p, f, lst[idx])
                        end
                    end
                end
            end
        end
        results
    end

Only when you run a task @async @spawn similar. W e use it to create a "supply" task for each process. E ach task selects the next exponent that needs to be calculated, waits for its process to complete, and then repeats until the index is used up. N ote that the Supply task only starts when the @sync at which point it abandons control and waits for all local tasks to complete before returning from the function. P rovisioning tasks nextidx() because they all run on the same process. T his process does not require locking because threads are scheduled in real time and not static. This means that the content switch only occurs when it is well defined: in this remotecall_fetch called.

Distributed array

Parallel computing uses a combination of memory resources on multiple machines, so you can use large arrays that cannot be implemented on one machine. At this point, a distributed array can be used, and each processor operates only on the portion of the array it owns.

A distributed array (or global object) is logically a single array, but it is divided into blocks, one saved on each processor. But the operation of the entire array is the same as that of the local array, and parallel calculations are hidden.

Distributed arrays are DArray type. DArray the same element types and Array Array. DArray data is achieved by dividing the index space into small pieces in each dimension.

Some common distributed arrays can be d that begin with d:

    dzeros(100,100,10)
    dones(100,100,10)
    drand(100,100,10)
    drandn(100,100,10)
    dfill(x, 100,100,10)

In the last example, the elements of the array are x the value x. T hese functions automatically select a distribution. I f you want to indicate which process to use and how to distribute the data, write this:

    dzeros((100,100), [1:4], [1,4])

The second parameter specifies that the array should be created in processors 1 through 4. W hen dividing data that contains many processes, people often see diminishing performance benefits. DArrays of processes that allow multiple DArray to be calculated at the same time, and each process has a higher percentage of communication work.

The third parameter specifies a distribution; T he nth element of the array specifies how many blocks should be divided. I n this case, the first dimension is not split, and the second dimension is divided into four blocks. T herefore, the size of each (100,25) Note that distributed arrays must match the number of processes.

distribute(a::Array) be used to convert local arrays to distributed arrays.

localpart(a::DArray) to get the DArray local storage.

localindexes(a::DArray) multi-group of dimension index value ranges stored by the local process.

convert(Array, a::DArray) all the data into the local process.

DArray index value range is used to index SubArray (square brackets), but the data is not copied.

Construct a distributed array

DArray is darray is stated as follows:

    DArray(init, dims[, procs, dist])

init the init function is a multi-group of index value ranges. T his function is a distributed array of local names and is initialized with a specified index value. dims the dimensions of the entire distributed array. procs is optional, indicating a vector with a process ID to use. dist is an integer vector that indicates that distributed arrays should be divided into pieces in each dimension.

The last two parameters are optional, using the default value when ignored.

The following example shows if you change the fill of a local array fill to a constructor of a distributed array:

    dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)

In this example, init function calls fill only for the dimensions of the local block it fill

Distributed array operations

At this time, distributed arrays don't have much functionality. T he main function is to allow communication through array indexes, which is convenient for many problems. A s an example, consider implementing a "life" cell automaton, where cells in each cell grid are updated according to their adjacent cells. E ach process requires cells in its local block that are directly adjacent to each other to calculate the results of an iteration. The following code does this:

    function life_step(d::DArray)
        DArray(size(d),procs(d)) do I
            top   = mod(first(I[1])-2,size(d,1))+1
            bot   = mod( last(I[1])  ,size(d,1))+1
            left  = mod(first(I[2])-2,size(d,2))+1
            right = mod( last(I[2])  ,size(d,2))+1

            old = Array(Bool, length(I[1])+2, length(I[2])+2)
            old[1      , 1      ] = d[top , left]   # left side
            old[2:end-1, 1      ] = d[I[1], left]
            old[end    , 1      ] = d[bot , left]
            old[1      , 2:end-1] = d[top , I[2]]
            old[2:end-1, 2:end-1] = d[I[1], I[2]]   # middle
            old[end    , 2:end-1] = d[bot , I[2]]
            old[1      , end    ] = d[top , right]  # right side
            old[2:end-1, end    ] = d[I[1], right]
            old[end    , end    ] = d[bot , right]

            life_rule(old)
        end
    end

As you can see, we use a series of index expressions to get the array old in a local old N ote that do syntax facilitates the passing of init DArray constructor. N ext, the continuous life_rule to provide update rules for the data, resulting in the DArray block. life_rule nothing DArray-specific but for the sake of integrity, we'll list it here:

    function life_rule(old)
        m, n = size(old)
        new = similar(old, m-2, n-2)
        for j = 2:n-1
            for i = 2:m-1
                nc = +(old[i-1,j-1], old[i-1,j], old[i-1,j+1],
                       old[i  ,j-1],             old[i  ,j+1],
                       old[i+1,j-1], old[i+1,j], old[i+1,j+1])
                new[i-1,j-1] = (nc == 3 || nc == 2 && old[i,j])
            end
        end
        new
    end

Shared array (for experimentation, only on unix)

Shared arrays use systems that share memory in many processes to map the same array. A lthough DArray SharedArray differently. I n a DArray process can access only one piece of data locally, and the two processes share the same piece; I n SharedArray each "participating" process has access to the entire array. SharedArray is a good choice when you want to access two or more processes together with a large amount of data on the same machine.

SharedArray (allocation and access values) work like regular arrays and are very efficient because their underlying memory is available for local processes. A s a result, most algorithms run SharedArrays even in single-process mode. W hen an algorithm must have an Array the SharedArray sdata(S) For other AbstractArray sdata itself, so it is sdata any array type.

In the form of sharing the number of numeric constructors:

  SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])

Create a shared array that is specified by the pids process, bitstype is T and is the size of dims Unlike distributed arrays, shared arrays can only be used for pid these participants (if on the same host, the same is true of the creation process).

If an init function initfn(S::SharedArray) is specified, it is called by all participants. You can control it, and each worker can run the init function in init the array, so it is initialized in parallel.

Here's a simple example:

  julia> addprocs(3)
  3-element Array{Any,1}:
   2
   3
   4

  julia> S = SharedArray(Int, (3,4), init = S -> S[localindexes(S)] = myid())
  3x4 SharedArray{Int64,2}:
   2  2  3  4
   2  3  3  4
   2  3  4  4

  julia> S[3,2] = 7
  7

  julia> S
  3x4 SharedArray{Int64,2}:
   2  2  3  4
   2  3  3  4
   2  7  4  4

localindexes a range of one-dimensional indexes that do not intersect, and it sometimes facilitates task communication between processes. Of course, you can divide your work the way you want:

  julia> S = SharedArray(Int, (3,4), init = S -> S[myid()-1:nworkers():length(S)] = myid())
  3x4 SharedArray{Int64,2}:
   2  2  2  2
   3  3  3  3
   4  4  4  4

Because all processes have access to the underlying data, you must be careful not to set conflicts. For example:

  @sync begin
      for p in workers()
          @async begin
              remotecall_wait(p, fill!, S, p)
          end
      end
  end

This can lead to undefined behavior: because each process pid fill the entire array, his pid is preserved regardless of which process S pid

ClusterManagers

Julia's work processes can also be generated on any machine, allowing Julia's natural parallel functionality to run very transparently in a clustered environment. ClusterManager provides a way to specify the means to start and manage the process. For example, ssh cluster also uses ClusterManager implement:

    immutable SSHManager <: ClusterManager
        launch::Function
        manage::Function
        machines::AbstractVector

        SSHManager(; machines=[]) = new(launch_ssh_workers, manage_ssh_workers, machines)
    end

    function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict)
        ...
    end

    function manage_ssh_workers(id::Integer, config::Dict, op::Symbol)
        ...
    end

launch_ssh_workers responsible for instantiation of new Julia manage_ssh_workers a way to manage those processes, such as sending interrupt signals. You can add a new addprocs

    addprocs(5, cman=LocalManager())

to specify that a batch of processes be ClusterManager is used to start those processes.

Footnote

In this article, MPI refers to the MPI-1 standard. S tarting with MPI-2, the MPI Standards Committee introduced a new set of communication mechanisms, collectively known as Remote Memory Access (RMA). T he motivation for adding RMA MPI standards is to improve unilateral communication patterns. For more information on the latest MPI standards, see http://www.mpi-forum.org/docs.