Библиотека Zero MQ (Android)

Библиотека ZeroMQ поддерживает множество языков программирвоания (при помощи bind'ов): СИ, С++, Java, Python, Rust и т.д. (по ссылке можно найти полный список). Прежде чем начать разбираться с ZeroMQ, давайте сначала посмотрим на общую концепцию очереди сообщений.

Основы ZMQ

Простота

ZeroMQ простая. Мы можем делать некоторые асинхронные операции ввода/вывода, также ZeroMQ может ставить в очередь сообщений сообщения из потока ввода/вывода. Потоки ввода/вывода в ZeroMQ работают асинхронно при общении с сетевым трафиком, поэтому они могут делать для вас и остальную работу. Если вы прежде работали с сокетами, то вы должны знать, что это весьма не просто. Тем не менее, ZeroMQ позволяет гораздо упростить работу с ними.

Производительность

ZeroMQ быстрая. Веб-сайту Second Life удалось получить 13,4 микросекунды непрерывного времени ожидания и до 4 100 000 сообщений в секунду. ZeroMQ может использовать многоадресный транспортный протокол, который является эффективным методом для передачи данных в различных направлениях.

Шаблон “Запрос - Ответ”

ZMQ_REP - ZMQ_REQ

На данном этапе мы рассмотрим связку клиент - сервер при помощи схемы “запрос - ответ” или ZMQ_REQ - ZMQ_REP.

1758001795757

Напишем простое клиент-серверное приложение на языке Clang: server.c

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "zmq.h"

int main (int argc, char const *argv[]) 
{
	void* context = zmq_ctx_new();
	void* respond = zmq_socket(context, ZMQ_REP);
	
	zmq_bind(respond, "tcp://*:4040");
	printf("Starting…\n");
	
	for(;;) 
	{
		zmq_msg_t request;
		zmq_msg_init(&request);
		zmq_msg_recv(&request, respond, 0);
		printf("Received: hello\n");
		zmq_msg_close(&request);
		sleep(1); // sleep one second
		zmq_msg_t reply;
		zmq_msg_init_size(&reply, strlen("world"));
		memcpy(zmq_msg_data(&reply), "world", 5);
		zmq_msg_send(&reply, respond, 0);
		zmq_msg_close(&reply);
	}
	zmq_close(respond);
	zmq_ctx_destroy(context);
	
	return 0;
}

client.c

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "zmq.h"

int main (int argc, char const *argv[]) 
{
	void* context = zmq_ctx_new();
	printf("Client Starting….\n");
	
	void* request = zmq_socket(context, ZMQ_REQ);
	zmq_connect(request, "tcp://localhost:4040");
	
	int count = 0;
	
	for(;;) 
	{
		zmq_msg_t req;
		zmq_msg_init_size(&req, strlen("hello"));
		memcpy(zmq_msg_data(&req), "hello", 5);
		printf("Sending: hello - %d\n", count);
		zmq_msg_send(&req, request, 0);
		zmq_msg_close(&req);
		zmq_msg_t reply;
		zmq_msg_init(&reply);
		zmq_msg_recv(&reply, request, 0);
		printf("Received: hello - %d\n", count);
		zmq_msg_close(&reply);
		count++;
	}
	// We never get here though.
	zmq_close(request);
	zmq_ctx_destroy(context);
	
	return 0;
}

Комментарии к серверной части:

  1. Метод zmq_ctx_new() - создает новый контекст. Он является потоко-безопасным, так что контекст можно использовать в нескольких потоках;

  2. zmq_socket(context, ZMQ_REP) - создает новый сокет в определенном контексте. Сокеты ZeroMQ не являются потоко-безопасными, поэтому они должны использоваться только в том потоке, в котором они были созданы. Традиционные сокеты синхронные, в то время как у сокетов ZeroMQ есть возможность создавать одну очередь на стороне клиента, а другую на стороне сервера, асинхронно управляя паттерном запрос-ответ. ZeroMQ автоматически организует настройку соединения, повторное подключение, отключение и доставку контента;

  3. zmq_bind(respond, "tcp://*:4040") - связывает сокет и блокирует порт 4040, таким образом сервер начинает свою работу;

  4. zmq_msg_recv(&request, respond, 0) - получение сообщения от client;

  5. zmq_msg_send(&reply, respond, 0) - отправка ответа клиенту.

Комментарии к клиентской части:

  1. Метод zmq_ctx_new() - создает новый контекст. Он является потоко-безопасным, так что контекст можно использовать в нескольких потоках;

  2. zmq_socket(context, ZMQ_REQ) - создает новый сокет в определенном контексте. ZMQ_REQ - это метод запроса на передачу данных.

  3. zmq_connect(request, "tcp://localhost:4040") - установка соединения с сервеной частью, запрос доступа на порт 4040 на localhost;

  4. zmq_msg_send(&req, request, 0) - отправка сообщения;

  5. zmq_msg_recv(&reply, request, 0) - получение ответа от сервера.

Немного потоки в Android (Thread)

Многопоточность — возможность программы выполнять несколько задач одновременно в рамках одного процесса. Применение фоновых потоков — необходимое условие, если вы хотите избежать появления диалогового окна для принудительного закрытия приложения. Когда активность в Android на протяжении 5 секунд не отвечает на события пользовательского ввода (например, нажатие кнопки) или приёмник широковещательных намерений (Intents) не завершает работу обработчика onReceive() в течение 10 секунд, считается, что приложение зависло. Подобные ситуации следует избегать любой ценой. Используйте фоновые потоки для всех трудоёмких операций, включая работу с файлами, сетевые запросы, транзакции в базах данных и сложные вычисления. Потоки — средство, которое помогает организовать одновременное выполнение нескольких задач, каждой в независимом потоке. Потоки представляют собой экземпляры классов, каждый из которых запускается и функционирует самостоятельно, автономно (или относительно автономно) от главного потока выполнения программы.

Проблема классических Thread в невозможности получить доступ к объектам\обработчикам основного графического потока (UI thread). Существует пара правил по работе с потоками:

  1. НЕ блокировать основной поток;

  2. НЕ пытайтесь получить доступ к Android UI Tooklit вне UI Thread.

Android предоставляет несколько вариантов работы в фоновом режиме безопасно с UI-потоком:

  1. Activity.runOnUiThread(Runnable)

  2. View.post(Runnable)

  3. View.postDelayed(Runnable, long)

  4. Handlers - начнем с с этого

  5. AsyncTask

Пример с View.post(Runnable):

fun onClick(v: View) {
    Thread(Runnable {
        //DO something
        while(1){
            println("Hello World!")
        }
    }).start()
}

Жизненный цикл потока

При выполнении программы объект класса Thread может быть в одном из четырех основных состояний: «новый», «работоспособный», «неработоспособный» и «пассивный». При создании потока он получает состояние «новый» (NEW) и не выполняется. Для перевода потока из состояния «новый» в состояние «работоспособный» (RUNNABLE) следует выполнить метод start(), который вызывает метод run() — основной метод потока.

Поток может находиться в одном из состояний, соответствующих элементам статически вложенного перечисления Thread.State:

  • NEW — поток создан, но еще не запущен;

  • RUNNABLE — поток выполняется;

  • BLOCKED — поток блокирован;

  • WAITING — поток ждет окончания работы другого потока;

  • TIMED_WAITING — поток некоторое время ждет окончания другого потока;

  • TERMINATED — поток завершен.

Инициализация потока

fun sayHello() {
    for (i in 0..10){
        println("Hello World!")
    }
}
val runnableServer = Runnable{sayHello()} 
val threadServer = Thread(runnableServer)
threadServer.start()

Подключение ZMQ (JeroMQ) в проект Android

Для установки библиотеки JeroMQ вам понадобится добавить зависимость в файл Gradle Scripts -> build.gradle.kts (Module :app) (как показано на рисунке ниже):

add_jeromq_build_gradle.png

Добавляем зависимость (implementation ("org.zeromq:jeromq:0.5.0") ):

dependencies {

    implementation(libs.androidx.core.ktx)
    implementation(libs.androidx.appcompat)
    implementation(libs.material)
    implementation(libs.androidx.activity)
    implementation(libs.androidx.constraintlayout)
    implementation("com.google.android.gms:play-services-location:21.2.0") // -->> Location
    implementation ("org.zeromq:jeromq:0.5.0") // -->> JeroMQ
    testImplementation(libs.junit)
    androidTestImplementation(libs.androidx.junit)
    androidTestImplementation(libs.androidx.espresso.core)
}

Далее, IDE Android Studio при фиксации новой зависимости предложит вам синхронизировать все зависимости проекта:

gradle_sync.png

Клиент и Сервер Рассмотрим пример работы Клиента и Сервера в одном Activity. Для этого создадим новое Activity, например, SocketsActivity, в котором определим фукнции работы клиента и сервера при помощи ZMQ.

Код onCreate(): В данном случае добавим одно TextView и Handler для связи потоков с основным потоком UI Thread.

class SocketsActivity : AppCompatActivity() {
    private var log_tag : String = "MY_LOG_TAG"

    private lateinit var tvSockets: TextView
    private var textString : String = ""
    private lateinit var handler: Handler
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        enableEdgeToEdge()
        setContentView(R.layout.activity_sockets)
        ViewCompat.setOnApplyWindowInsetsListener(findViewById(R.id.main)) { v, insets ->
            val systemBars = insets.getInsets(WindowInsetsCompat.Type.systemBars())
            v.setPadding(systemBars.left, systemBars.top, systemBars.right, systemBars.bottom)
            insets
        }
        tvSockets = findViewById(R.id.tvSockets)
        handler = Handler(Looper.getMainLooper())
    }
}

Код функции server:

fun startServer() {
    val context = ZMQ.context(1)
    val socket = ZContext().createSocket(SocketType.REP)
    socket.bind("tcp://*:2222") // Replace with your server's IP and port
    var counter: Int = 0

    // В бесконечном цикле ожидаем "привета" от клиента и отвечаем ему, если нужно
    while(true){
        counter++
        // Получаем данные от Клиента
        val requestBytes = socket.recv(0)
        val request = String(requestBytes, ZMQ.CHARSET)
        println("[SERVER] Received request: [$request]")

        // Здесь имитируется какая-то сложная работа
        handler.postDelayed({
            tvSockets.text = "Received MSG from Client = $counter"
        }, 0) // Это безопасная работа с основным потоком UI Thread
        Thread.sleep(1000)
        // Закончили очень сложную работу

        // Подготавливаем и отправляем ответ Клиенту
        val response = "Hello from Android ZMQ Server!"
        socket.send(response.toByteArray(ZMQ.CHARSET), 0)
        println("[SERVER] Sent reply: [$response]")
    }
    // Безопасно закрываем контекст и сокет.
    socket.close();
    context.close();
}

Код функции client:

fun startClient() {
    val context = ZMQ.context(1)
    val socket = ZContext().createSocket(SocketType.REQ)
    socket.connect("tcp://localhost:2222") // Запрос на соединение с сервером
    val request = "Hello from Android client!"
    for(i in 0..10){
        socket.send(request.toByteArray(ZMQ.CHARSET), 0)
        Log.d(log_tag, "[CLIENT] SendT: $request")

        val reply = socket.recv(0)
        Log.d(log_tag, "[CLIENT] Received: " + String(reply, ZMQ.CHARSET))
    }
    socket.close()
    context.close()
}

Код запуска двух потоков (клиента и сервера): Запускать будем в методе onResume(), когда пользователь видит окно Activity.

 override fun onResume() {
    super.onResume()
    val runnableServer = Runnable{startServer()}
    val threadServer = Thread(runnableServer)
    threadServer.start()

    Thread.sleep(1000)

    val runnableClient = Runnable{startClient()}
    val threadClient = Thread(runnableClient)
    threadClient.start()
}