Add RemovableDrive plugin that has been moved from Uranium

Since it now depends on GCodeWriter we should put it somewhere where
GCodeWriter actually exists.
This commit is contained in:
Arjen Hiemstra 2015-07-30 17:14:22 +02:00
parent 3e024e1618
commit 825349b47b
6 changed files with 395 additions and 0 deletions

View file

@ -0,0 +1,41 @@
# Copyright (c) 2015 Ultimaker B.V.
# Copyright (c) 2013 David Braam
# Uranium is released under the terms of the AGPLv3 or higher.
from . import RemovableDrivePlugin
import glob
import os
import subprocess
## Support for removable devices on Linux.
#
# TODO: This code uses the most basic interfaces for handling this.
# We should instead use UDisks2 to handle mount/unmount and hotplugging events.
#
class LinuxRemovableDrivePlugin(RemovableDrivePlugin.RemovableDrivePlugin):
def checkRemovableDrives(self):
drives = {}
for volume in glob.glob("/media/*"):
if os.path.ismount(volume):
drives[volume] = os.path.basename(volume)
elif volume == "/media/"+os.getenv("USER"):
for volume in glob.glob("/media/"+os.getenv("USER")+"/*"):
if os.path.ismount(volume):
drives[volume] = os.path.basename(volume)
for volume in glob.glob("/run/media/" + os.getenv("USER") + "/*"):
if os.path.ismount(volume):
drives[volume] = os.path.basename(volume)
return drives
def performEjectDevice(self, device):
p = subprocess.Popen(["umount", device.getId()], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = p.communicate()
return_code = p.wait()
if return_code != 0:
return False
else:
return True

View file

@ -0,0 +1,64 @@
# Copyright (c) 2015 Ultimaker B.V.
# Copyright (c) 2013 David Braam
# Uranium is released under the terms of the AGPLv3 or higher.
from . import RemovableDrivePlugin
import threading
import subprocess
import time
import os
import plistlib
## Support for removable devices on Mac OSX
class OSXRemovableDrives(RemovableDrivePlugin.RemovableDrivePlugin):
def run(self):
drives = {}
p = subprocess.Popen(["system_profiler", "SPUSBDataType", "-xml"], stdout=subprocess.PIPE)
plist = plistlib.loads(p.communicate()[0])
p.wait()
for dev in self._findInTree(plist, "Mass Storage Device"):
if "removable_media" in dev and dev["removable_media"] == "yes" and "volumes" in dev and len(dev["volumes"]) > 0:
for vol in dev["volumes"]:
if "mount_point" in vol:
volume = vol["mount_point"]
drives[volume] = os.path.basename(volume)
p = subprocess.Popen(["system_profiler", "SPCardReaderDataType", "-xml"], stdout=subprocess.PIPE)
plist = plistlib.loads(p.communicate()[0])
p.wait()
for entry in plist:
if "_items" in entry:
for item in entry["_items"]:
for dev in item["_items"]:
if "removable_media" in dev and dev["removable_media"] == "yes" and "volumes" in dev and len(dev["volumes"]) > 0:
for vol in dev["volumes"]:
if "mount_point" in vol:
volume = vol["mount_point"]
drives[volume] = os.path.basename(volume)
def performEjectDevice(self, device):
p = subprocess.Popen(["diskutil", "eject", path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = p.communicate()
return_code = p.wait()
if return_code != 0:
return False
else:
return True
def _findInTree(self, t, n):
ret = []
if type(t) is dict:
if "_name" in t and t["_name"] == n:
ret.append(t)
for k, v in t.items():
ret += self._findInTree(v, n)
if type(t) is list:
for v in t:
ret += self._findInTree(v, n)
return ret

View file

@ -0,0 +1,87 @@
import os.path
from UM.Application import Application
from UM.Logger import Logger
from UM.Message import Message
from UM.Mesh.WriteMeshJob import WriteMeshJob
from UM.Mesh.MeshWriter import MeshWriter
from UM.Scene.Iterator.BreadthFirstIterator import BreadthFirstIterator
from UM.OutputDevice.OutputDevice import OutputDevice
from UM.OutputDevice import OutputDeviceError
from UM.i18n import i18nCatalog
catalog = i18nCatalog("uranium")
class RemovableDriveOutputDevice(OutputDevice):
def __init__(self, device_id, device_name):
super().__init__(device_id)
self.setName(device_name)
self.setShortDescription(catalog.i18nc("", "Save to Removable Drive"))
self.setDescription(catalog.i18nc("", "Save to Removable Drive {0}").format(device_name))
self.setIconName("save_sd")
self.setPriority(1)
def requestWrite(self, node):
gcode_writer = Application.getInstance().getMeshFileHandler().getWriterByMimeType("text/x-gcode")
if not gcode_writer:
Logger.log("e", "Could not find GCode writer, not writing to removable drive %s", self.getName())
raise OutputDeviceError.WriteRequestFailedError()
file_name = None
for n in BreadthFirstIterator(node):
if n.getMeshData():
file_name = n.getName()
if file_name:
break
if not file_name:
Logger.log("e", "Could not determine a proper file name when trying to write to %s, aborting", self.getName())
raise OutputDeviceError.WriteRequestFailedError()
file_name = os.path.join(self.getId(), os.path.splitext(file_name)[0] + ".gcode")
try:
Logger.log("d", "Writing to %s", file_name)
stream = open(file_name, "wt")
job = WriteMeshJob(gcode_writer, stream, node, MeshWriter.OutputMode.TextMode)
job.setFileName(file_name)
job.progress.connect(self._onProgress)
job.finished.connect(self._onFinished)
message = Message(catalog.i18nc("", "Saving to Removable Drive {0}").format(self.getName()), 0, False, -1)
message.show()
job._message = message
job.start()
except PermissionError as e:
raise OutputDeviceError.PermissionDeniedError() from e
except OSError as e:
raise OutputDeviceError.WriteRequestFailedError() from e
def _onProgress(self, job, progress):
if hasattr(job, "_message"):
job._message.setProgress(progress)
self.writeProgress.emit(self, progress)
def _onFinished(self, job):
if hasattr(job, "_message"):
job._message.hide()
job._message = None
self.writeFinished.emit(self)
if job.getResult():
message = Message(catalog.i18nc("", "Saved to Removable Drive {0} as {1}").format(self.getName(), os.path.basename(job.getFileName())))
message.addAction("eject", catalog.i18nc("", "Eject"), "eject", catalog.i18nc("", "Eject removable device {0}").format(self.getName()))
message.actionTriggered.connect(self._onActionTriggered)
message.show()
self.writeSuccess.emit(self)
else:
message = Message(catalog.i18nc("", "Could not save to removable drive {0}: {1}").format(self.getName(), str(job.getError())))
message.show()
self.writeError.emit(self)
job.getStream().close()
def _onActionTriggered(self, message, action):
if action == "eject":
Application.getInstance().getOutputDeviceManager().getOutputDevicePlugin("RemovableDriveOutputDevice").ejectDevice(self)

View file

@ -0,0 +1,73 @@
# Copyright (c) 2015 Ultimaker B.V.
# Uranium is released under the terms of the AGPLv3 or higher.
import threading
import time
from UM.Signal import Signal
from UM.Message import Message
from UM.OutputDevice.OutputDevicePlugin import OutputDevicePlugin
from . import RemovableDriveOutputDevice
from UM.i18n import i18nCatalog
catalog = i18nCatalog("uranium")
class RemovableDrivePlugin(OutputDevicePlugin):
def __init__(self):
super().__init__()
self._update_thread = threading.Thread(target = self._updateThread)
self._update_thread.setDaemon(True)
self._check_updates = True
self._drives = {}
def start(self):
self._update_thread.start()
def stop(self):
self._check_updates = False
self._update_thread.join()
self._addRemoveDrives({})
def checkRemovableDrives(self):
raise NotImplementedError()
def ejectDevice(self, device):
result = self.performEjectDevice(device)
if result:
message = Message(catalog.i18n("Ejected {0}. You can now safely remove the drive.").format(device.getName()))
message.show()
else:
message = Message(catalog.i18n("Failed to eject {0}. Maybe it is still in use?").format(device.getName()))
message.show()
def performEjectDevice(self, device):
raise NotImplementedError()
def _updateThread(self):
while self._check_updates:
result = self.checkRemovableDrives()
self._addRemoveDrives(result)
time.sleep(5)
def _addRemoveDrives(self, drives):
# First, find and add all new or changed keys
for key, value in drives.items():
if key not in self._drives:
self.getOutputDeviceManager().addOutputDevice(RemovableDriveOutputDevice.RemovableDriveOutputDevice(key, value))
continue
if self._drives[key] != value:
self.getOutputDeviceManager().removeOutputDevice(key)
self.getOutputDeviceManager().addOutputDevice(RemovableDriveOutputDevice.RemovableDriveOutputDevice(key, value))
# Then check for keys that have been removed
for key in self._drives.keys():
if key not in drives:
self.getOutputDeviceManager().removeOutputDevice(key)
self._drives = drives

View file

@ -0,0 +1,98 @@
# Copyright (c) 2015 Ultimaker B.V.
# Copyright (c) 2013 David Braam
# Uranium is released under the terms of the AGPLv3 or higher.
from . import RemovableDrivePlugin
import threading
import string
from ctypes import windll
from ctypes import wintypes
import ctypes
import time
import os
import subprocess
from UM.i18n import i18nCatalog
catalog = i18nCatalog("uranium")
# WinAPI Constants that we need
# Hardcoded here due to stupid WinDLL stuff that does not give us access to these values.
DRIVE_REMOVABLE = 2
GENERIC_READ = 2147483648
GENERIC_WRITE = 1073741824
FILE_SHARE_READ = 1
FILE_SHARE_WRITE = 2
IOCTL_STORAGE_EJECT_MEDIA = 2967560
OPEN_EXISTING = 3
## Removable drive support for windows
class WindowsRemovableDrivePlugin(RemovableDrivePlugin.RemovableDrivePlugin):
def checkRemovableDrives(self):
drives = {}
bitmask = windll.kernel32.GetLogicalDrives()
# Check possible drive letters, from A to Z
# Note: using ascii_uppercase because we do not want this to change with locale!
for letter in string.ascii_uppercase:
drive = "{0}:/".format(letter)
# Do we really want to skip A and B?
# GetDriveTypeA explicitly wants a byte array of type ascii. It will accept a string, but this wont work
if bitmask & 1 and windll.kernel32.GetDriveTypeA(drive.encode("ascii")) == DRIVE_REMOVABLE:
volume_name = ""
name_buffer = ctypes.create_unicode_buffer(1024)
filesystem_buffer = ctypes.create_unicode_buffer(1024)
error = windll.kernel32.GetVolumeInformationW(ctypes.c_wchar_p(drive), name_buffer, ctypes.sizeof(name_buffer), None, None, None, filesystem_buffer, ctypes.sizeof(filesystem_buffer))
if error != 0:
volume_name = name_buffer.value
if not volume_name:
volume_name = catalog.i18nc("Default name for removable device", "Removable Drive")
# Certain readers will report themselves as a volume even when there is no card inserted, but will show an
# "No volume in drive" warning when trying to call GetDiskFreeSpace. However, they will not report a valid
# filesystem, so we can filter on that. In addition, this excludes other things with filesystems Windows
# does not support.
if filesystem_buffer.value == "":
continue
# Check for the free space. Some card readers show up as a drive with 0 space free when there is no card inserted.
freeBytes = ctypes.c_longlong(0)
if windll.kernel32.GetDiskFreeSpaceExA(drive.encode("ascii"), ctypes.byref(freeBytes), None, None) == 0:
continue
if freeBytes.value < 1:
continue
drives[drive] = "{0} ({1}:)".format(volume_name, letter)
bitmask >>= 1
return drives
def performEjectDevice(self, device):
# Magic WinAPI stuff
# First, open a handle to the Device
handle = windll.kernel32.CreateFileA("\\\\.\\{0}".format(device.getId()[:-1]).encode("ascii"), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None )
if handle == -1:
print(windll.kernel32.GetLastError())
return
result = None
# Then, try and tell it to eject
if not windll.kernel32.DeviceIoControl(handle, IOCTL_STORAGE_EJECT_MEDIA, None, None, None, None, None, None):
result = False
else:
result = True
# Finally, close the handle
windll.kernel32.CloseHandle(handle)
return result

View file

@ -0,0 +1,32 @@
# Copyright (c) 2015 Ultimaker B.V.
# Uranium is released under the terms of the AGPLv3 or higher.
import platform
from UM.i18n import i18nCatalog
catalog = i18nCatalog("uranium")
def getMetaData():
return {
"plugin": {
"name": catalog.i18nc("Removable Drive Output Device Plugin name", "Removable Drive Output Device Plugin"),
"author": "Ultimaker B.V.",
"description": catalog.i18nc("Removable Drive Output Device Plugin description", "Provides removable drive hotplugging and writing support"),
"version": "1.0",
"api": 2
}
}
def register(app):
if platform.system() == "Windows":
from . import WindowsRemovableDrivePlugin
return { "output_device": WindowsRemovableDrivePlugin.WindowsRemovableDrivePlugin() }
elif platform.system() == "Darwin":
from . import OSXRemovableDrivePlugin
return { "output_device": OSXRemovableDrivePlugin.OSXRemovableDrivePlugin() }
elif platform.system() == "Linux":
from . import LinuxRemovableDrivePlugin
return { "output_device": LinuxRemovableDrivePlugin.LinuxRemovableDrivePlugin() }
else:
Logger.log("e", "Unsupported system %s, no removable device hotplugging support available.", platform.system())
return { }