JUCE-7.0.12-0-g4f43011b96 JUCE-7.0.12-0-g4f43011b96
JUCE — C++ application framework with suport for VST, VST3, LV2 audio plug-ins

« « « Anklang Documentation
Loading...
Searching...
No Matches
juce_InterprocessConnection.cpp
Go to the documentation of this file.
1 /*
2 ==============================================================================
3
4 This file is part of the JUCE library.
5 Copyright (c) 2022 - Raw Material Software Limited
6
7 JUCE is an open source library subject to commercial or open-source
8 licensing.
9
10 The code included in this file is provided under the terms of the ISC license
11 http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12 To use, copy, modify, and/or distribute this software for any purpose with or
13 without fee is hereby granted provided that the above copyright notice and
14 this permission notice appear in all copies.
15
16 JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17 EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18 DISCLAIMED.
19
20 ==============================================================================
21*/
22
23namespace juce
24{
25
27{
28 ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
29 void run() override { owner.runThread(); }
30
33};
34
36{
37public:
39 : ref (p) {}
40
41 template <typename Fn>
42 void ifSafe (Fn&& fn)
43 {
44 const ScopedLock lock (mutex);
45
46 if (safe)
47 fn (ref);
48 }
49
50 void setSafe (bool s)
51 {
52 const ScopedLock lock (mutex);
53 safe = s;
54 }
55
56 bool isSafe()
57 {
58 const ScopedLock lock (mutex);
59 return safe;
60 }
61
62private:
63 CriticalSection mutex;
65 bool safe = false;
66};
67
69{
70 using SafeActionImpl::SafeActionImpl;
71};
72
73//==============================================================================
75 : useMessageThread (callbacksOnMessageThread),
76 magicMessageHeader (magicMessageHeaderNumber),
77 safeAction (std::make_shared<SafeAction> (*this))
78{
79 thread.reset (new ConnectionThread (*this));
80}
81
83{
84 // You *must* call `disconnect` in the destructor of your derived class to ensure
85 // that any pending messages are not delivered. If the messages were delivered after
86 // destroying the derived class, we'd end up calling the pure virtual implementations
87 // of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
88 // not a good idea!
89 jassert (! safeAction->isSafe());
90
91 callbackConnectionState = false;
92 disconnect (4000, Notify::no);
93 thread.reset();
94}
95
96//==============================================================================
98 int portNumber, int timeOutMillisecs)
99{
100 disconnect();
101
102 auto s = std::make_unique<StreamingSocket>();
103
104 if (s->connect (hostName, portNumber, timeOutMillisecs))
105 {
106 const ScopedWriteLock sl (pipeAndSocketLock);
107 initialiseWithSocket (std::move (s));
108 return true;
109 }
110
111 return false;
112}
113
115{
116 disconnect();
117
118 auto newPipe = std::make_unique<NamedPipe>();
119
120 if (newPipe->openExisting (pipeName))
121 {
122 const ScopedWriteLock sl (pipeAndSocketLock);
123 pipeReceiveMessageTimeout = timeoutMs;
124 initialiseWithPipe (std::move (newPipe));
125 return true;
126 }
127
128 return false;
129}
130
132{
133 disconnect();
134
135 auto newPipe = std::make_unique<NamedPipe>();
136
137 if (newPipe->createNewPipe (pipeName, mustNotExist))
138 {
139 const ScopedWriteLock sl (pipeAndSocketLock);
140 pipeReceiveMessageTimeout = timeoutMs;
141 initialiseWithPipe (std::move (newPipe));
142 return true;
143 }
144
145 return false;
146}
147
148void InterprocessConnection::disconnect (int timeoutMs, Notify notify)
149{
150 thread->signalThreadShouldExit();
151
152 {
153 const ScopedReadLock sl (pipeAndSocketLock);
154 if (socket != nullptr) socket->close();
155 if (pipe != nullptr) pipe->close();
156 }
157
158 thread->stopThread (timeoutMs);
159 deletePipeAndSocket();
160
161 if (notify == Notify::yes)
162 connectionLostInt();
163
164 callbackConnectionState = false;
165 safeAction->setSafe (false);
166}
167
168void InterprocessConnection::deletePipeAndSocket()
169{
170 const ScopedWriteLock sl (pipeAndSocketLock);
171 socket.reset();
172 pipe.reset();
173}
174
176{
177 const ScopedReadLock sl (pipeAndSocketLock);
178
179 return ((socket != nullptr && socket->isConnected())
180 || (pipe != nullptr && pipe->isOpen()))
181 && threadIsRunning;
182}
183
185{
186 {
187 const ScopedReadLock sl (pipeAndSocketLock);
188
189 if (pipe == nullptr && socket == nullptr)
190 return {};
191
192 if (socket != nullptr && ! socket->isLocal())
193 return socket->getHostName();
194 }
195
196 return IPAddress::local().toString();
197}
198
199//==============================================================================
201{
202 uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
204
205 MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
206 messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
207 messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
208
209 return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
210}
211
212int InterprocessConnection::writeData (void* data, int dataSize)
213{
214 const ScopedReadLock sl (pipeAndSocketLock);
215
216 if (socket != nullptr)
217 return socket->write (data, dataSize);
218
219 if (pipe != nullptr)
220 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
221
222 return 0;
223}
224
225//==============================================================================
226void InterprocessConnection::initialise()
227{
228 safeAction->setSafe (true);
229 threadIsRunning = true;
230 connectionMadeInt();
231 thread->startThread();
232}
233
234void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
235{
236 jassert (socket == nullptr && pipe == nullptr);
237 socket = std::move (newSocket);
238 initialise();
239}
240
241void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
242{
243 jassert (socket == nullptr && pipe == nullptr);
244 pipe = std::move (newPipe);
245 initialise();
246}
247
248//==============================================================================
250{
252 : safeAction (ipc), connectionMade (connected)
253 {}
254
255 void messageCallback() override
256 {
257 safeAction->ifSafe ([this] (InterprocessConnection& owner)
258 {
259 if (connectionMade)
260 owner.connectionMade();
261 else
262 owner.connectionLost();
263 });
264 }
265
267 bool connectionMade;
268
270};
271
272void InterprocessConnection::connectionMadeInt()
273{
274 if (! callbackConnectionState)
275 {
276 callbackConnectionState = true;
277
278 if (useMessageThread)
279 (new ConnectionStateMessage (safeAction, true))->post();
280 else
282 }
283}
284
285void InterprocessConnection::connectionLostInt()
286{
287 if (callbackConnectionState)
288 {
289 callbackConnectionState = false;
290
291 if (useMessageThread)
292 (new ConnectionStateMessage (safeAction, false))->post();
293 else
295 }
296}
297
299{
301 : safeAction (ipc), data (d)
302 {}
303
304 void messageCallback() override
305 {
306 safeAction->ifSafe ([this] (InterprocessConnection& owner)
307 {
308 owner.messageReceived (data);
309 });
310 }
311
313 MemoryBlock data;
314};
315
316void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
317{
318 jassert (callbackConnectionState);
319
320 if (useMessageThread)
321 (new DataDeliveryMessage (safeAction, data))->post();
322 else
323 messageReceived (data);
324}
325
326//==============================================================================
327int InterprocessConnection::readData (void* data, int num)
328{
329 const ScopedReadLock sl (pipeAndSocketLock);
330
331 if (socket != nullptr)
332 return socket->read (data, num, true);
333
334 if (pipe != nullptr)
335 return pipe->read (data, num, pipeReceiveMessageTimeout);
336
338 return -1;
339}
340
341bool InterprocessConnection::readNextMessage()
342{
344 auto bytes = readData (messageHeader, sizeof (messageHeader));
345
346 if (bytes == (int) sizeof (messageHeader)
347 && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
348 {
350
351 if (bytesInMessage > 0)
352 {
353 MemoryBlock messageData ((size_t) bytesInMessage, true);
354 int bytesRead = 0;
355
356 while (bytesInMessage > 0)
357 {
358 if (thread->threadShouldExit())
359 return false;
360
361 auto numThisTime = jmin (bytesInMessage, 65536);
362 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
363
364 if (bytesIn <= 0)
365 break;
366
367 bytesRead += bytesIn;
369 }
370
371 if (bytesRead >= 0)
372 deliverDataInt (messageData);
373 }
374
375 return true;
376 }
377
378 if (bytes < 0)
379 {
380 if (socket != nullptr)
381 deletePipeAndSocket();
382
383 connectionLostInt();
384 }
385
386 return false;
387}
388
389void InterprocessConnection::runThread()
390{
391 while (! thread->threadShouldExit())
392 {
393 if (socket != nullptr)
394 {
395 auto ready = socket->waitUntilReady (true, 100);
396
397 if (ready < 0)
398 {
399 deletePipeAndSocket();
400 connectionLostInt();
401 break;
402 }
403
404 if (ready == 0)
405 {
406 thread->wait (1);
407 continue;
408 }
409 }
410 else if (pipe != nullptr)
411 {
412 if (! pipe->isOpen())
413 {
414 deletePipeAndSocket();
415 connectionLostInt();
416 break;
417 }
418 }
419 else
420 {
421 break;
422 }
423
424 if (thread->threadShouldExit() || ! readNextMessage())
425 break;
426 }
427
428 threadIsRunning = false;
429}
430
431} // namespace juce
static Type swapIfBigEndian(Type value) noexcept
Swaps the byte order of a signed or unsigned integer if the CPU is big-endian.
Automatically locks and unlocks a mutex object.
static IPAddress local(bool IPv6=false) noexcept
Returns an IPv4 or IPv6 address meaning "localhost", equivalent to 127.0.0.1 (IPv4) or ::1 (IPv6)
String toString() const
Returns a dot- or colon-separated string in the form "1.2.3.4" (IPv4) or "1:2:3:4:5:6:7:8" (IPv6).
Manages a simple two-way messaging connection to another process, using either a socket or a named pi...
virtual void connectionMade()=0
Called when the connection is first connected.
virtual void messageReceived(const MemoryBlock &message)=0
Called when a message arrives.
void disconnect(int timeoutMs=-1, Notify notify=Notify::yes)
Disconnects and closes any currently-open sockets or pipes.
String getConnectedHostName() const
Returns the name of the machine at the other end of this connection.
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
Creates a connection.
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
Tries to create a new pipe for other processes to connect to.
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
Tries to connect the object to an existing named pipe.
bool isConnected() const
True if a socket or pipe is currently active.
Notify
Whether the disconnect call should trigger callbacks.
virtual void connectionLost()=0
Called when the connection is broken.
bool sendMessage(const MemoryBlock &message)
Tries to send a message to the other end of this connection.
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
Tries to connect this object to a socket.
A class to hold a resizable block of raw data.
void * getData() noexcept
Returns a void pointer to the data.
size_t getSize() const noexcept
Returns the block's current allocated size, in bytes.
Internal class used as the base class for all message objects.
The base class for objects that can be sent to a MessageListener.
Automatically locks and unlocks a ReadWriteLock object.
Automatically locks and unlocks a ReadWriteLock object.
The JUCE String class!
Definition juce_String.h:53
Encapsulates a thread.
Definition juce_Thread.h:43
#define jassert(expression)
Platform-independent assertion macro.
#define JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(className)
This is a shorthand way of writing both a JUCE_DECLARE_NON_COPYABLE and JUCE_LEAK_DETECTOR macro for ...
#define jassertfalse
This will always cause an assertion failure.
typedef int
JUCE Namespace.
constexpr Type jmin(Type a, Type b)
Returns the smaller of two values.
Type * addBytesToPointer(Type *basePointer, IntegerType bytes) noexcept
A handy function which adds a number of bytes to any type of pointer and returns the result.
Type unalignedPointerCast(void *ptr) noexcept
Casts a pointer to another type via void*, which suppresses the cast-align warning which sometimes ar...
Definition juce_Memory.h:88
unsigned int uint32
A platform-independent 32-bit unsigned integer type.
void run() override
Must be implemented to perform the thread's actual code.