=Architecture

The compute grid is controlled by a process running a server on port 8082. Each worker process on a workstation makes a connections to the grid, advertises which ''messages'' it is able to accept, and then waits for instructions. If the controller goes down, the worker attempts to reconnect.

The controller has a queue of jobs, each identified by a ''message''. As it cycles through the list of available workers, the controller assigns each idle worker the next compatible job from the queue. If a worker dies, the socket will eventually timeout and the controller will push its job to the head of the queue. The controller's '''run''' loop ends when there are no jobs left to do.

The worker code can be implemented either in JavaScript or python (inside 4Sight).

The controller sees the grid as a set of worker nodes. Each worker advertises the names of the messages it is willing to accept. So, on startup, each worker contacts the controller, which updates its node database.

=Starting a grid controller

The grid controller starts a server on port 8082 by default
<source>
  c:\lab>jsdb.exe -load grid.js
  js>grid = new Grid(8082)
  js>grid.queue('4dopen','c:\\amd\\data\\DTM\\EIDP\\h5\\1n.h5',writeln)
  js>grid.run()
</source>
You can dispatch computations to run in parallel. The results are returned as an array
<source>
  js>grid.parallel("return 5*6","return Math.sin(1.4)","return this.grid.resource('test.txt').readFile().length")
  30,0.9854497299884601,340
</source>
You can also execute the same function across the grid with different parameters
<source>
  js>grid.evaluate(function(x,y){return x*y}, [ [5,6] , [4,6] , [4,5] , [6,6] )
  30,24,20,36
</source>

=Messaging interface

Messages to a worker
<source>
STATUS
DATA name length\n...
MESSAGE something to print on the screen
RUN message_name length\nparameters
QUIT
</source>

Messages from a worker:
<source>
HELLO
IDLE
RUNNING messagename length\n[parameters]
DATA name length\n...
ERROR can't run that job
ACCEPT compute,scale,exec
MESSAGE text to print to stdout
GET name
BUSY can't accept that last job
</source>

=Shared data

Files in the controller's current working directory are available to all workers through the '''GET''' message. Also, shared data objects are set by sending a '''DATA''' message to the controller, and retrieved with the same '''GET''' message. Names match files before shared data. The controller may preemptively send a resource file with '''DATA''' message, which would be cached by the worker.

Communication is asynchronous, so actually retrieving data is a little tricky. The ''worker.js'' file has some magic for this.
<source>
shared_data = system.grid.resource('filename.dat')                 //get (binary data stream)
system.grid.share('myobject',{name: 'Shanti', phone: '3-7246'})    //put (object automatically converted to text)
</source>

=Worker

Minimal implementation in JavaScript

<source>
var data = []
var grid = new Stream("tcp://localhost:8082")
grid.writeln('HELLO')
grid.writeln('ACCEPT foo,bar')
grid.writeln('IDLE worker ready!')
 
while (1)
{
  let line = grid.readln()
  
  if (!line) continue
  
  if (line == 'QUIT') break
      
  let [,message,text] = line.match(/(\S+)\s*(.*)/)
  
  if(message == 'MESSAGE')
  {
    writeln(text)
  }
  else if (message == 'RUN')
  {
    let [,name,length] = text.match(/(\S*)\s*(.*)/)
    let params = this.master.read(Number(length))
    grid.writeln('RUNNING ',name)
    
    if (name == 'foo') //no return value
    { 
      grid.writeln('RESULT 0 0')
    }
    else if (name == 'bar') //return value
    { 
      var result = params.length.toString()
      grid.writeln('RESULT 0 ',result.length)
      grid.write(result)
    }
  }
  else if (message =='STATUS')
  {
      grid.writeln('IDLE')
  }
  else if (message == 'DATA')
  {
    let [,name,length] = text.match(/(\S*)\s*(.*)/)
    data[name] = grid.read(Number(length))
  }
  else
  {
    writeln("Unknown instruction ",line)
  }
}

</source>

Minimal implementation in python

<source>
import socket
 
def recvcmd(source):
    command = ''
    char = ' '
    while len(char) and ord(char) != 10:
      char = source.recv(1)
      command = command + char
    return string.strip(command)
 
def recvbytes(source,length)
    msg = ''
    while len(msg) < length:
      msg += source.recv(length-len(msg))
    return msg
 
data = {}
grid = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
grid.connect(("192.168.0.11",8082))
grid.send("HELLO\n")
grid.send("ACCEPT 4d\n")
grid.send("ACCEPT IDLE 4D ready!\n")
 
while 1:
  line = recvcmd(grid)
 
  if line == '':
    continue
 
  if line == 'QUIT':
    break
 
  message, text = line.split(' ',1)
 
  if message == 'MESSAGE':
    print text
 
  elif message == 'RUN':
    name, length = text.split(' ',1)
    params = recvbytes(grid,int(length))
    grid.send('RUNNING %s 0\n' % name)
    
    if name == 'foo': #no return value
      grid.send('RESULT 0 0\n')
      
    elif name == 'bar': #return value
      answer = "%d" % len(params)
      grid.send('RESULT 0 %d\n' %len(answer))
      grid.send(answer)
      
  elif message == 'STATUS':
    grid.send('IDLE\n')
    
  elif message == 'DATA':
    name, length = text.split(' ',1)
    data[name] = recvbytes(grid,int(length))
    
  elif message == 'QUIT':
    break
    
  else:
    print "Unknown instruction %s" % line
</source>
