1

Тема: Передача даних між потоками

Привіт. Пишу клієнтську частину для ММО ЕрПоГе. Використовую двигун Unity3D, в принципі там той самий С# але структура скриптів своя. Коротше, дивіться яка схема. Для початку - в Юніті є свої методи, котрі можна запускати лишень в головному потоці. Ці методи відповідають за роботу з об'єктами в сцені  і так далі, наприклад,  створення в сцені об'єкту та присвоєння йому координат  необхідно робити в головному потоці. В скриптах Юніті є декілька важливих методів, це void Start, котрий являється чимось типу конструктора, він виконується до виконання інших скриптів, void Update, це метод, котрий виконується кожен фрейм. Так от, я використовую асинхронні сокети для взаємодії сервера та клієнта.  І, очевидно, данні приймаються методом, котрий виконується ну ніяк не в головному потоці. Тоді як ці дані передати в головний потік? Ну я зробив просто, створив List<byte[]>, і коли асинхронний метод приймає дані, то я заношу ці дані  в List, а в Update перевіряю, якщо в List щось є, то я витягую ці дані, та передаю на обробку в інший метод, а в тому методі, по закінченню всіх справ - я видаляю оброблені дані з List. Так от, ця схема і працює, але не ідеально, тому що не всі дані, котрі прийняв асинхронний метод, обробляються в методі, котрий викликається в Update. Яким ще чином можна організувати таку обробку даних? Ось код, щоб було зрозуміліше, зверніть увагу на методи ReceiveCallback та Update, перший приймає дані та заносить в чергу на обробку, а другий вже обробляє.
p.s. це не windows forms, тому ніякі Invoke не спрацюють.

using UnityEngine;
using System.Collections;
using System.Net;
using System.Net.Sockets;
using System;
using System.Text;
using System.Linq;
using System.Collections.Generic;
 
public class client : MonoBehaviour {
       
        private GameObject player; 
        private Socket socket; 
        private byte[] buff; 
        private List<byte[]> listRec; 
        private characters iAm;
        //private List<byte[]> listSend;
        void Start () {
                socket=globalConnection.s;
                player = (GameObject)GameObject.FindGameObjectWithTag("Player"); 
                buff = new byte[4096]; 
                listRec=new List<byte[]>(); 
                //listSend=new List<byte[]>();
                iAm=globalConnection.activePlayer;
                SetActivePlayer(iAm);
                CreatePlayers(); 
        }
       
        void Update()
        {
                if(listRec.Count>0) 
                {
                        lock(listRec){
                        byte[] bf = listRec[listRec.Count-1]; 
                        ExecuteRec(bf); 
                        }
                }
                /*Debug.Log("count of global GO "+globalConnection.players.Count);
                foreach(var c in globalConnection.players)
                        Debug.Log(c);
                /*if(listSend.Count>0)
                {
                        StartCoroutine("ExecuteSend",listSend[listSend.Count-1]);
                }*/
        }
       
        byte[] PackPos()
        {
                float x,y,z,rx,ry,rz,rw;
                byte[] bf = new byte[64];
                int index=4; 
               
                int char_id=iAm.id; 
               
                x=player.transform.position.x;
                y=player.transform.position.y;
                z=player.transform.position.z;
               
                rx=player.transform.rotation.x;
                ry=player.transform.rotation.y;
                rz=player.transform.rotation.z;
                rw=player.transform.rotation.w;
               
                BitConverter.GetBytes(char_id).CopyTo(bf,index);
               
                BitConverter.GetBytes(x).CopyTo(bf,index+4);
                BitConverter.GetBytes(y).CopyTo(bf,index+8);
                BitConverter.GetBytes(z).CopyTo(bf,index+12);
               
                BitConverter.GetBytes(rx).CopyTo(bf,index+16);
                BitConverter.GetBytes(ry).CopyTo(bf,index+20);
                BitConverter.GetBytes(rz).CopyTo(bf,index+24);
                BitConverter.GetBytes(rw).CopyTo(bf,index+28);
               
                Encoding.UTF8.GetBytes("man").CopyTo(bf,0); 
                return bf;
        }
       
        void ExecuteRec(byte[] b)
        {
                string req = Encoding.UTF8.GetString(b,0,3);
                switch(req)
                {
                case "man" :
                        SetMain(b); 
                        Debug.Log("Receive man");
                        break;
                case "act" :
                        CreateChar(b);
                        Debug.Log("Act");
                        break;
                }
                lock(listRec)
                {
                listRec.Remove(b); 
                }               //yield return 0;
 
        }
       
        IEnumerator Sending()
        {
                while(true){ 
                byte[] bf = PackPos(); 
                        try{
                socket.BeginSend(bf,0,bf.Length,SocketFlags.None,new AsyncCallback(SendCallback),socket);
                        }
                        catch(Exception e)
                        {
                                Debug.Log("Sending "+e.Message);
                        }
                yield return new WaitForSeconds(1); 
                }
        }
       
        void CreatePlayers()
        {
                if(globalConnection.activePlayers.Count>0) 
                {
                        GameObject go = (GameObject)Resources.Load("Players"); 
                        foreach(var pl in globalConnection.activePlayers)
                        {
                                GameObject g = (GameObject)Instantiate(go); 
                                PlayerManager pm = (PlayerManager)g.GetComponent("PlayerManager"); 
                                pm.Construct(pl.name,pl.id,pl.x,pl.y,pl.z); 
                                globalConnection.players.Add(pm);                        }
                }
                socket.BeginReceive(buff,0,buff.Length,SocketFlags.None,new AsyncCallback(ReceiveCallback),socket); 
                StartCoroutine("Sending");
        }
        void CreateChar(byte[] b)
        {
 
                float x = BitConverter.ToSingle(b,20);
                float y = BitConverter.ToSingle(b,24);
                float z = BitConverter.ToSingle(b,28);
                string nick = Encoding.UTF8.GetString(b,10,20);
                int id = BitConverter.ToInt32(b,4);
                GameObject go = (GameObject)Resources.Load("Players");
                lock(globalConnection.players)
                {
                        GameObject g = (GameObject)Instantiate(go);
                        PlayerManager pm = (PlayerManager)g.GetComponent("PlayerManager");
                        pm.Construct(nick,id,x,y,z);
                        globalConnection.players.Add(pm);
                }
        }
        void SetMain(byte[] b)
        {      
                int char_id= BitConverter.ToInt32(b,4); 
                Debug.Log("char_id = "+char_id);
                foreach(var pm in globalConnection.players) 
                {
                        //PlayerManager pm = (PlayerManager)ch.GetComponent("PlayerManager");
                        if(pm.id==char_id)
                        {
                                int index=4;
                        float x = BitConverter.ToSingle(b,index+4);
                        float y = BitConverter.ToSingle(b,index+8);
                        float z = BitConverter.ToSingle(b,index+12);
                        float rx = BitConverter.ToSingle(b,index+16);
                        float ry = BitConverter.ToSingle(b,index+20);
                        float rz = BitConverter.ToSingle(b,index+24);
                        float rw = BitConverter.ToSingle(b,index+28);
                                pm.transform.position=new Vector3(x,y,z);
                                pm.transform.rotation=new Quaternion(rx,ry,rz,rw);
                                Debug.Log("Set: x="+x+" y="+y+" z="+z);
                        }
                }
 
        }
       
        void SetActivePlayer(characters ch)
        {
                byte[] bf = new byte[32];
                Encoding.UTF8.GetBytes("act").CopyTo(bf,0);
                BitConverter.GetBytes(ch.id).CopyTo(bf,3);
                Encoding.UTF8.GetBytes(ch.name).CopyTo(bf,10);
                socket.BeginReceive(buff,0,buff.Length,SocketFlags.None,new AsyncCallback(ReceiveCallback),socket);
                socket.BeginSend(bf,0,bf.Length,SocketFlags.None,new AsyncCallback(SendCallback),socket);
        }
       
        void SendCallback(IAsyncResult ar)
        {
                Socket s = (Socket)ar.AsyncState;
                try{
                        s.EndSend(ar);
                }
                catch(Exception e)
                {
                        Debug.Log(e.Message);
                }
        }
       
        void ReceiveCallback(IAsyncResult ar)
        {
                Debug.Log("Receive from client");
                Socket s = (Socket)ar.AsyncState;
                string ss = Encoding.UTF8.GetString(buff,0,3);
                try{
                        int n = s.EndSend(ar);
                        lock(listRec){
                        listRec.Add(buff);
                        }
                        Debug.Log("ReceiveCallback "+n+" with command = "+ss); 
                }
                catch(Exception e)
                {
                        Debug.Log("ReceiveCallback "+e.Message);
                        s.Close();
                }
                s.BeginReceive(buff,0,buff.Length,SocketFlags.None,new AsyncCallback(ReceiveCallback),s);
        }
 
}

2

Re: Передача даних між потоками

Наскільки мені відомо, головні обчислення мають виконуватися в головному потоці за допомогою калбека. Для OpenGL це Idle функція.

3

Re: Передача даних між потоками

Очі.завидющі написав:

Наскільки мені відомо, головні обчислення мають виконуватися в головному потоці за допомогою калбека. Для OpenGL це Idle функція.

Головні обчислення чого? і за допомогою якого калбека? І при чому тут OpengGL?

4

Re: Передача даних між потоками

Це зветься "черга подій" (event queue). А вам радять, що, можливо, краще додавати до неї не масиви байтів, а делегати функцій (посилання на функції, колбеки), які будуть обробляти отримані дані. Тоді обробник взагалі тупий: якщо черга непуста - виконати перший колбек, якщо пуста - спати до наступного кадра.

5

Re: Передача даних між потоками

koala написав:

Це зветься "черга подій" (event queue). А вам радять, що, можливо, краще додавати до неї не масиви байтів, а делегати функцій (посилання на функції, колбеки), які будуть обробляти отримані дані. Тоді обробник взагалі тупий: якщо черга непуста - виконати перший колбек, якщо пуста - спати до наступного кадра.

да я теж про це колись думав і наробив купу тих делегатів, але так і не в'їхав, як їх викорисовувати, можете надати простий приклад з чергою та делегатами?

6

Re: Передача даних між потоками

А які в тому Unity3D є об’єкти синхронізації? Є там критичні секції, чи може списки, спеціально заточені під багатопоточність?

7

Re: Передача даних між потоками

Torbins написав:

А які в тому Unity3D є об’єкти синхронізації? Є там критичні секції, чи може списки, спеціально заточені під багатопоточність?

ну я нічо про таке не чув.

8

Re: Передача даних між потоками

koala написав:

Це зветься "черга подій" (event queue). А вам радять, що, можливо, краще додавати до неї не масиви байтів, а делегати функцій (посилання на функції, колбеки), які будуть обробляти отримані дані. Тоді обробник взагалі тупий: якщо черга непуста - виконати перший колбек, якщо пуста - спати до наступного кадра.

я шось не пойняв, делегати яких функцій тре додавати? Я от тренуюсь, метод в Ліст запихнути можу, і викликати його потім в головному потоці, але як запихнути в Ліст метод з данними? ось так не працює

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using System.Threading;

namespace test
{
    class Program
    {
        static List<Del> list = new List<Del>(); 
        static void Main(string[] args)
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
            Thread t = new Thread(Thread1);
            t.Start();
            bool tr = true;
            while (tr)
            {
                if (list.Count > 0)
                {

                    Del d = list[0];
                    d();
                    tr = false;
                }
            }
                  Console.Read();

        }


        static void Meth(string s)
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId+" s = "+s);
        }

      delegate void Del(string s);
        

             static void Thread1()
             {
                 Del d = Meth;
                 list.Add(d("вася лох"));

             }
    }


}

9

Re: Передача даних між потоками

Делегат - це (умовно) посилання на функцію. Можете сприймати це як ім'я функції: ми збираємо список імен функцій, які потім в певних умовах (в головному потоці) будуть викликані. Відповідно, у делегата (ще) нема параметрів. Ви можете створити свій клас, що мітитиме делегата і параметри, складати з них список, а потім викликати; можете складати дані в окреме місце, звідки ваші делегати їх братимуть; можете придумати ще щось.

                 Del d = Meth;
                 list.Add(d("вася лох"));

Це взагалі не працює. d("") - це виклик функції, і в список заноситься її результат. Вам треба додавати

list.Add(d);

Я мав на увазі щось таке. Основний цикл:

while (running)//якась нелокальна для функції змінна, її вимикає один з делегатів
{
  lock(listDel)//щоб інші потоки не завадили
  {
     while (listDel.Count > 0)
     {
       listDel[0]();
       listDel.RemoveAt(0);
    }
  }
}

створення делегата:

void ReceiveCallback(IAsyncResult ar)
{
  ...//отримуємо дані
  lock(listData)
  {
    lock(listDel)
    {
      listDel.Add(ім'я_делегата);
      listData.Add(дані);
    }
  }
  ...
}

Типова функція для делегата:

void CreateChar()
{
  //дані беремо з listData
  ...
}

10

Re: Передача даних між потоками

koala написав:

Делегат - це (умовно) посилання на функцію. Можете сприймати це як ім'я функції: ми збираємо список імен функцій, які потім в певних умовах (в головному потоці) будуть викликані. Відповідно, у делегата (ще) нема параметрів. Ви можете створити свій клас, що мітитиме делегата і параметри, складати з них список, а потім викликати; можете складати дані в окреме місце, звідки ваші делегати їх братимуть; можете придумати ще щось.

                 Del d = Meth;
                 list.Add(d("вася лох"));

Це взагалі не працює. d("") - це виклик функції, і в список заноситься її результат. Вам треба додавати

list.Add(d);

Я мав на увазі щось таке. Основний цикл:

while (running)//якась нелокальна для функції змінна, її вимикає один з делегатів
{
  lock(listDel)//щоб інші потоки не завадили
  {
     while (listDel.Count > 0)
     {
       listDel[0]();
       listDel.RemoveAt(0);
    }
  }
}

створення делегата:

void ReceiveCallback(IAsyncResult ar)
{
  ...//отримуємо дані
  lock(listData)
  {
    lock(listDel)
    {
      listDel.Add(ім'я_делегата);
      listData.Add(дані);
    }
  }
  ...
}

Типова функція для делегата:

void CreateChar()
{
  //дані беремо з listData
  ...
}

ну я зрозумів логіку, якщо раніше я додавав в чергу лише дані, а потім витягував дані, запускав метод передаючи ці дані як аргументи, то тепер я маю додавати в одну чергу дані, в іншу чергу метод, а потім витягувати дані, витягувати метод і запускати це все. Ну тіпа таво. Але чи має це сенс? Проблема в тому, що витягуються і обробляються не всі дані, котрі є в черзі, тобто це не залежить від методу, котрий обробляє дані. Проблема саме між додаванням даних в чергу і витягуванням цих даних. Дані приходят, це 100%, вони додаються в чергу, це, майбуть, теж 100%, але вони не витягуються з черги в Update і не обробляются і це не стосується всіх даних, більша частинка обробляєтся, а решта чомусь пропускаєтся. Я тут трошки подумав, Update виконуєтся кожен кадр. Кадрів в середньому 60 в секунду. То можливо саме в цьому проблема? Може дані приходять набагато швидше ніж встигає виконуватись Update? Хоча в моєму випадку це 4096 байтів в секунду, дивно було б, якщо Update не встигав. Коротше, я спробую дослідити, в якому місці дані вислизають.

11

Re: Передача даних між потоками

TRYCUKI_V_KROVI написав:

. Я тут трошки подумав, Update виконуєтся кожен кадр. Кадрів в середньому 60 в секунду. То можливо саме в цьому проблема? Може дані приходять набагато швидше ніж встигає виконуватись Update? Хоча в моєму випадку це 4096 байтів в секунду, дивно було б, якщо Update не встигав. Коротше, я спробую дослідити, в якому місці дані вислизають.

Зверніть увагу на те, як ви витягаєте дані зі списку - і як я. Особливо на перевірку того, скільки елементів в черзі.

12

Re: Передача даних між потоками

koala
У вашому коді головне - це наявність синхронізації за допомогою lock-ів. А що запихати у список, то вже не суттєво. Делегати просто універсальніші і небезпечніші.

13

Re: Передача даних між потоками

Трохи пошукав - можна ще простіше за допомогою лямбд. Основний цикл такий самий, як у мене (тільки без даних), функції всі залишаються ваші, додавання до черги:

void ReceiveCallback(IAsyncResult ar)
{
  ...//отримуємо дані, визначаємо потрібну функцію
  lock(list)
  {
    list.Add(()=>назва_функції(дані));
  }
  ...
}

14

Re: Передача даних між потоками

koala написав:

Трохи пошукав - можна ще простіше за допомогою лямбд. Основний цикл такий самий, як у мене (тільки без даних), функції всі залишаються ваші, додавання до черги:

void ReceiveCallback(IAsyncResult ar)
{
  ...//отримуємо дані, визначаємо потрібну функцію
  lock(list)
  {
    list.Add(()=>назва_функції(дані));
  }
  ...
}

не дігнав на рахунок визначання потрібних функцій, ви маєте наувазі, що треба створити свою функцію під кожну команду? може краще зробити одну здорову функцію, котра вже сама там все робить?

15

Re: Передача даних між потоками

Одна здорова - завжди гірше за багато дрібних.
Ще раз: код лишається майже таким, як і був у вас на початку, тільки обробка в Update змінюється згідно з моїм коментарем від 23:15:48 і замість додавання до списку байтів додаються лямбди (тобто анонімні функції), типу

list.Add(()=>{ExecuteRec(data);});

(фігурні дужки додав для очевидності; ()=> - проголошення анонімної функції, лямбда-функції).

Подякували: Очі.завидющі, FakiNyan2

16 Востаннє редагувалося FakiNyan (05.07.2013 00:29:34)

Re: Передача даних між потоками

koala написав:

Одна здорова - завжди гірше за багато дрібних.
Ще раз: код лишається майже таким, як і був у вас на початку, тільки обробка в Update змінюється згідно з моїм коментарем від 23:15:48 і замість додавання до списку байтів додаються лямбди (тобто анонімні функції), типу

list.Add(()=>{ExecuteRec(data);});

(фігурні дужки додав для очевидності; ()=> - проголошення анонімної функції, лямбда-функції).

а яким тоді буде тип List'a?

17

Re: Передача даних між потоками

koala написав:

Одна здорова - завжди гірше за багато дрібних.
Ще раз: код лишається майже таким, як і був у вас на початку, тільки обробка в Update змінюється згідно з моїм коментарем від 23:15:48 і замість додавання до списку байтів додаються лямбди (тобто анонімні функції), типу

list.Add(()=>{ExecuteRec(data);});

(фігурні дужки додав для очевидності; ()=> - проголошення анонімної функції, лямбда-функції).

чюваак, ти самий геніальний чювак в світі!!! xD все заработало, не знаю, чи надовго це, але зара використав три клієнта і з обома все спрацювало, ну тобто в першого клієнта в сцені з явилось двоє інших

18

Re: Передача даних між потоками

Мені тепер стало трохи цікаво, як воно це реалізовує і оптимізує... Але головне, в будь-якому разі, правило №1: працює - не чіпай!

19

Re: Передача даних між потоками

koala написав:

як воно це реалізовує і оптимізує...

це ви маєте на увазі - як я це все зліпив і оптимізував? ну ліплю я поступово, малими частинами, спочатку вхід в гру, це створення двух полів для логіна і пароля  і кнопки, коли на кнопку натиснули - з'єднуємось с сервером  і відправляємо дані, а сервер вже дивиться, шо з цим робить і відправляє, або не відправляє дані назад. Якщо комбінація логін пароля існує - то сервер повертає id юзера, якщо ні - повертає нуль. А клієнт дивиться, якщо прийшов нуль, то виводим повідомлення про помилку, якщо не нуль, то запамятовуємо id і socket в окремому глобальному статичному класі і завантажуєм сцену нумер два. А в тій сцені беремо id і socket, робимо запит до серверу з вимогою списка персонажів, сервер робить запрос до базі даних, повертає, або не повертає список, якщо персонажі є  - то клієнт рендерить кнопки з іменами персонажів. Також в сцені є ще три кнопки. Створити персонажа, видалити і ввійти в гру. Коли натискаєм на кнопку з персонажем - то заносим його дані в глобальний статичний клас і завантажуєм наступну сцену. В ній, спочатку робимо запит до серверу з проханням надати список персонажів, котрі вже в грі, якщо сервер щось повертає, то ми беремо і створюємо необхідним персонажів та заносимо їх в массив. Потім робимо запит на сервер і кажемо йому таким чином - я активний. Після цього дані про нашого персонажа будуть відсилатися іншим гравцям, якщо вони зайдуть в гру, а наш персонаж буде вже в грі. Ну а якщо ті персонажі вже були в грі, а ми тільки що зайшли, то ми теж створюємось в них в сцені. Ну це тільки що ми робили. От, а потім приходять дані від кожного персонажа про його позицію, а сервер відправляє ці дані всім іншим гравцям, котрі активні. А клієнт дивится до якого гравця ці дані відоносятся, вибирає цього гравця з массиву всіх гравців і присвоює йому нові координати.

20

Re: Передача даних між потоками

Та ні, це було не до вас питання. Ми вчили асемблер і компілятори, і зачіпали оптимізацію, але як сучасні компілятори реалізовують лямбди - для мене - загадка. Якщо робити все в лоб, то там виходить забагато метушні (мало не компілювати кожного разу нову функцію під час виконання програми); мабуть, є десь непогана теорія оптимізації лямбд...